chroma_types/
task.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::str::FromStr;
4use std::time::SystemTime;
5use uuid::Uuid;
6
7use crate::CollectionUuid;
8
9define_uuid_newtype!(
10    /// JobId is a wrapper around Uuid to provide a unified type for job identifiers.
11    /// Jobs can be either collection compaction jobs or task execution jobs.
12    JobId,
13    new_v4
14);
15
16// Custom From implementations for JobId
17impl From<CollectionUuid> for JobId {
18    fn from(collection_uuid: CollectionUuid) -> Self {
19        JobId(collection_uuid.0)
20    }
21}
22
23impl From<AttachedFunctionUuid> for JobId {
24    fn from(attached_function_uuid: AttachedFunctionUuid) -> Self {
25        JobId(attached_function_uuid.0)
26    }
27}
28
29define_uuid_newtype!(
30    /// AttachedFunctionUuid is a wrapper around Uuid to provide a type for attached function identifiers.
31    #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
32    AttachedFunctionUuid,
33    new_v4
34);
35
36define_uuid_newtype!(
37    /// NonceUuid is a wrapper around Uuid to provide a type for attached function execution nonces.
38    NonceUuid,
39    now_v7
40);
41
42/// AttachedFunction represents an asynchronous function that is triggered by collection writes
43/// to map records from a source collection to a target collection.
44fn default_systemtime() -> SystemTime {
45    SystemTime::UNIX_EPOCH
46}
47
48#[derive(Clone, Debug, Deserialize, Serialize)]
49pub struct AttachedFunction {
50    /// Unique identifier for the attached function
51    pub id: AttachedFunctionUuid,
52    /// Human-readable name for the attached function instance
53    pub name: String,
54    /// UUID of the function/built-in definition this attached function uses
55    pub function_id: uuid::Uuid,
56    /// Source collection that triggers the attached function
57    pub input_collection_id: CollectionUuid,
58    /// Name of target collection where attached function output is stored
59    pub output_collection_name: String,
60    /// ID of the output collection (lazily filled in after creation)
61    pub output_collection_id: Option<CollectionUuid>,
62    /// Optional JSON parameters for the function
63    pub params: Option<String>,
64    /// Tenant name this attached function belongs to (despite field name, this is a name not a UUID)
65    pub tenant_id: String,
66    /// Database name this attached function belongs to (despite field name, this is a name not a UUID)
67    pub database_id: String,
68    /// Timestamp of the last successful function run
69    #[serde(skip, default)]
70    pub last_run: Option<SystemTime>,
71    /// Timestamp when the attached function should next run
72    #[serde(skip, default = "default_systemtime")]
73    pub next_run: SystemTime,
74    /// Completion offset: the WAL position up to which the attached function has processed records
75    pub completion_offset: u64,
76    /// Minimum number of new records required before the attached function runs again
77    pub min_records_for_invocation: u64,
78    /// Whether the attached function has been soft-deleted
79    #[serde(skip, default)]
80    pub is_deleted: bool,
81    /// Timestamp when the attached function was created
82    #[serde(default = "default_systemtime")]
83    pub created_at: SystemTime,
84    /// Timestamp when the attached function was last updated
85    #[serde(default = "default_systemtime")]
86    pub updated_at: SystemTime,
87    /// Next nonce (UUIDv7) for execution tracking
88    pub next_nonce: NonceUuid,
89    /// Lowest live nonce (UUIDv7) - marks the earliest epoch that still needs verification
90    /// When lowest_live_nonce is Some and < next_nonce, it indicates finish failed and we should
91    /// skip execution and only run the scout_logs recheck phase
92    /// None indicates the attached function has never been scheduled (brand new)
93    pub lowest_live_nonce: Option<NonceUuid>,
94}
95
96/// ScheduleEntry represents a scheduled attached function run for a collection.
97#[derive(Clone, Debug, Deserialize, Serialize)]
98pub struct ScheduleEntry {
99    pub collection_id: CollectionUuid,
100    pub attached_function_id: Uuid,
101    pub attached_function_run_nonce: NonceUuid,
102    pub when_to_run: Option<DateTime<Utc>>,
103    /// Lowest live nonce - marks the earliest nonce that still needs verification.
104    /// Nonces less than this value are considered complete.
105    pub lowest_live_nonce: Option<Uuid>,
106}
107
108impl TryFrom<crate::chroma_proto::ScheduleEntry> for ScheduleEntry {
109    type Error = ScheduleEntryConversionError;
110
111    fn try_from(proto: crate::chroma_proto::ScheduleEntry) -> Result<Self, Self::Error> {
112        let collection_id = proto
113            .collection_id
114            .ok_or(ScheduleEntryConversionError::MissingField(
115                "collection_id".to_string(),
116            ))
117            .and_then(|id| {
118                CollectionUuid::from_str(&id).map_err(|_| {
119                    ScheduleEntryConversionError::InvalidUuid("collection_id".to_string())
120                })
121            })?;
122
123        let attached_function_id = proto
124            .attached_function_id
125            .ok_or(ScheduleEntryConversionError::MissingField(
126                "attached_function_id".to_string(),
127            ))
128            .and_then(|id| {
129                Uuid::parse_str(&id).map_err(|_| {
130                    ScheduleEntryConversionError::InvalidUuid("attached_function_id".to_string())
131                })
132            })?;
133
134        let attached_function_run_nonce = proto
135            .run_nonce
136            .ok_or(ScheduleEntryConversionError::MissingField(
137                "run_nonce".to_string(),
138            ))
139            .and_then(|nonce| {
140                Uuid::parse_str(&nonce)
141                    .map(NonceUuid)
142                    .map_err(|_| ScheduleEntryConversionError::InvalidUuid("run_nonce".to_string()))
143            })?;
144
145        let when_to_run = proto
146            .when_to_run
147            .and_then(|ms| DateTime::from_timestamp_millis(ms as i64));
148
149        let lowest_live_nonce = proto
150            .lowest_live_nonce
151            .as_ref()
152            .and_then(|nonce_str| Uuid::parse_str(nonce_str).ok());
153
154        Ok(ScheduleEntry {
155            collection_id,
156            attached_function_id,
157            attached_function_run_nonce,
158            when_to_run,
159            lowest_live_nonce,
160        })
161    }
162}
163
164#[derive(Debug, thiserror::Error)]
165pub enum ScheduleEntryConversionError {
166    #[error("Missing required field: {0}")]
167    MissingField(String),
168    #[error("Invalid UUID for field: {0}")]
169    InvalidUuid(String),
170}