chroma_types/
task.rs

1use serde::{Deserialize, Serialize};
2use std::time::SystemTime;
3
4use crate::CollectionUuid;
5
6define_uuid_newtype!(
7    /// JobId is a wrapper around Uuid to provide a unified type for job identifiers.
8    /// Jobs can be either collection compaction jobs or task execution jobs.
9    JobId,
10    new_v4
11);
12
13// Custom From implementations for JobId
14impl From<CollectionUuid> for JobId {
15    fn from(collection_uuid: CollectionUuid) -> Self {
16        JobId(collection_uuid.0)
17    }
18}
19
20impl From<AttachedFunctionUuid> for JobId {
21    fn from(attached_function_uuid: AttachedFunctionUuid) -> Self {
22        JobId(attached_function_uuid.0)
23    }
24}
25
26define_uuid_newtype!(
27    /// AttachedFunctionUuid is a wrapper around Uuid to provide a type for attached function identifiers.
28    #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
29    AttachedFunctionUuid,
30    new_v4
31);
32
33/// AttachedFunction represents an asynchronous function that is triggered by collection writes
34/// to map records from a source collection to a target collection.
35fn default_systemtime() -> SystemTime {
36    SystemTime::UNIX_EPOCH
37}
38
39#[derive(Clone, Debug, Deserialize, Serialize)]
40pub struct AttachedFunction {
41    /// Unique identifier for the attached function
42    pub id: AttachedFunctionUuid,
43    /// Human-readable name for the attached function instance
44    pub name: String,
45    /// UUID of the function/built-in definition this attached function uses
46    pub function_id: uuid::Uuid,
47    /// Source collection that triggers the attached function
48    pub input_collection_id: CollectionUuid,
49    /// Name of target collection where attached function output is stored
50    pub output_collection_name: String,
51    /// ID of the output collection (lazily filled in after creation)
52    pub output_collection_id: Option<CollectionUuid>,
53    /// Optional JSON parameters for the function
54    pub params: Option<String>,
55    /// Tenant name this attached function belongs to (despite field name, this is a name not a UUID)
56    pub tenant_id: String,
57    /// Database name this attached function belongs to (despite field name, this is a name not a UUID)
58    pub database_id: String,
59    /// Timestamp of the last successful function run
60    #[serde(skip, default)]
61    pub last_run: Option<SystemTime>,
62    /// Completion offset: the WAL position up to which the attached function has processed records
63    pub completion_offset: u64,
64    /// Minimum number of new records required before the attached function runs again
65    pub min_records_for_invocation: u64,
66    /// Whether the attached function has been soft-deleted
67    #[serde(skip, default)]
68    pub is_deleted: bool,
69    /// Timestamp when the attached function was created
70    #[serde(default = "default_systemtime")]
71    pub created_at: SystemTime,
72    /// Timestamp when the attached function was last updated
73    #[serde(default = "default_systemtime")]
74    pub updated_at: SystemTime,
75    // is_ready is a column in the database, but not in the struct because
76    // it is not meant to be used in rust code. If it is false, rust code
77    // should never even see it.
78}
79
80#[derive(Debug, thiserror::Error)]
81pub enum AttachedFunctionConversionError {
82    #[error("Invalid UUID: {0}")]
83    InvalidUuid(String),
84    #[error("Attached function params aren't supported yet")]
85    ParamsNotSupported,
86}
87
88impl TryFrom<crate::chroma_proto::AttachedFunction> for AttachedFunction {
89    type Error = AttachedFunctionConversionError;
90
91    fn try_from(
92        attached_function: crate::chroma_proto::AttachedFunction,
93    ) -> Result<Self, Self::Error> {
94        // Parse attached_function_id
95        let attached_function_id = attached_function
96            .id
97            .parse::<AttachedFunctionUuid>()
98            .map_err(|_| {
99                AttachedFunctionConversionError::InvalidUuid("attached_function_id".to_string())
100            })?;
101
102        // Parse function_id
103        let function_id = attached_function
104            .function_id
105            .parse::<uuid::Uuid>()
106            .map_err(|_| AttachedFunctionConversionError::InvalidUuid("function_id".to_string()))?;
107
108        // Parse input_collection_id
109        let input_collection_id = attached_function
110            .input_collection_id
111            .parse::<CollectionUuid>()
112            .map_err(|_| {
113                AttachedFunctionConversionError::InvalidUuid("input_collection_id".to_string())
114            })?;
115
116        // Parse output_collection_id if available
117        let output_collection_id = attached_function
118            .output_collection_id
119            .map(|id| id.parse::<CollectionUuid>())
120            .transpose()
121            .map_err(|_| {
122                AttachedFunctionConversionError::InvalidUuid("output_collection_id".to_string())
123            })?;
124
125        // Parse params if available - only allow empty JSON "{}" or empty struct for now.
126        // TODO(tanujnay112): Process params when we allow them
127        let params = if let Some(params_struct) = &attached_function.params {
128            if !params_struct.fields.is_empty() {
129                return Err(AttachedFunctionConversionError::ParamsNotSupported);
130            }
131            Some("{}".to_string())
132        } else {
133            None
134        };
135
136        // Parse timestamps
137        let created_at = std::time::SystemTime::UNIX_EPOCH
138            + std::time::Duration::from_micros(attached_function.created_at);
139        let updated_at = std::time::SystemTime::UNIX_EPOCH
140            + std::time::Duration::from_micros(attached_function.updated_at);
141
142        Ok(AttachedFunction {
143            id: attached_function_id,
144            name: attached_function.name,
145            function_id,
146            input_collection_id,
147            output_collection_name: attached_function.output_collection_name,
148            output_collection_id,
149            params,
150            tenant_id: attached_function.tenant_id,
151            database_id: attached_function.database_id,
152            last_run: None, // Not available in proto
153            completion_offset: attached_function.completion_offset,
154            min_records_for_invocation: attached_function.min_records_for_invocation,
155            is_deleted: false, // Not available in proto, would need to be fetched separately
156            created_at,
157            updated_at,
158        })
159    }
160}