chroma_types/
task.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::str::FromStr;
4use std::time::SystemTime;
5use uuid::Uuid;
6
7use crate::CollectionUuid;
8
9/// TaskUuid is a wrapper around Uuid to provide a type for task identifiers.
10#[derive(
11    Copy, Clone, Debug, Default, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize,
12)]
13#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
14pub struct TaskUuid(pub Uuid);
15
16impl TaskUuid {
17    pub fn new() -> Self {
18        TaskUuid(Uuid::new_v4())
19    }
20}
21
22impl std::str::FromStr for TaskUuid {
23    type Err = uuid::Error;
24
25    fn from_str(s: &str) -> Result<Self, Self::Err> {
26        match Uuid::parse_str(s) {
27            Ok(uuid) => Ok(TaskUuid(uuid)),
28            Err(err) => Err(err),
29        }
30    }
31}
32
33impl std::fmt::Display for TaskUuid {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        write!(f, "{}", self.0)
36    }
37}
38
39/// Task represents an asynchronous task that is triggered by collection writes
40/// to map records from a source collection to a target collection.
41#[derive(Clone, Debug, Deserialize, Serialize)]
42pub struct Task {
43    /// Unique identifier for the task
44    pub id: TaskUuid,
45    /// Human-readable name for the task instance
46    pub name: String,
47    /// Name of the operator/built-in definition this task uses (despite field name, this is a name not a UUID)
48    pub operator_id: String,
49    /// Source collection that triggers the task
50    pub input_collection_id: CollectionUuid,
51    /// Name of target collection where task output is stored
52    pub output_collection_name: String,
53    /// ID of the output collection (lazily filled in after creation)
54    pub output_collection_id: Option<String>,
55    /// Optional JSON parameters for the operator
56    pub params: Option<String>,
57    /// Tenant name this task belongs to (despite field name, this is a name not a UUID)
58    pub tenant_id: String,
59    /// Database name this task belongs to (despite field name, this is a name not a UUID)
60    pub database_id: String,
61    /// Timestamp of the last successful task run
62    #[serde(skip, default)]
63    pub last_run: Option<SystemTime>,
64    /// Timestamp when the task should next run (None if not yet scheduled)
65    #[serde(skip, default)]
66    pub next_run: Option<SystemTime>,
67    /// Completion offset: the WAL position up to which the task has processed records
68    pub completion_offset: u64,
69    /// Minimum number of new records required before the task runs again
70    pub min_records_for_task: u64,
71    /// Whether the task has been soft-deleted
72    #[serde(skip, default)]
73    pub is_deleted: bool,
74    /// Timestamp when the task was created
75    pub created_at: SystemTime,
76    /// Timestamp when the task was last updated
77    pub updated_at: SystemTime,
78}
79
80/// ScheduleEntry represents a scheduled task run for a collection.
81#[derive(Clone, Debug, Deserialize, Serialize)]
82pub struct ScheduleEntry {
83    pub collection_id: CollectionUuid,
84    pub task_id: Uuid,
85    pub task_run_nonce: Uuid,
86    pub when_to_run: Option<DateTime<Utc>>,
87}
88
89impl TryFrom<crate::chroma_proto::ScheduleEntry> for ScheduleEntry {
90    type Error = ScheduleEntryConversionError;
91
92    fn try_from(proto: crate::chroma_proto::ScheduleEntry) -> Result<Self, Self::Error> {
93        let collection_id = proto
94            .collection_id
95            .ok_or(ScheduleEntryConversionError::MissingField(
96                "collection_id".to_string(),
97            ))
98            .and_then(|id| {
99                CollectionUuid::from_str(&id).map_err(|_| {
100                    ScheduleEntryConversionError::InvalidUuid("collection_id".to_string())
101                })
102            })?;
103
104        let task_id = proto
105            .task_id
106            .ok_or(ScheduleEntryConversionError::MissingField(
107                "task_id".to_string(),
108            ))
109            .and_then(|id| {
110                Uuid::parse_str(&id)
111                    .map_err(|_| ScheduleEntryConversionError::InvalidUuid("task_id".to_string()))
112            })?;
113
114        let task_run_nonce = proto
115            .task_run_nonce
116            .ok_or(ScheduleEntryConversionError::MissingField(
117                "task_run_nonce".to_string(),
118            ))
119            .and_then(|nonce| {
120                Uuid::parse_str(&nonce).map_err(|_| {
121                    ScheduleEntryConversionError::InvalidUuid("task_run_nonce".to_string())
122                })
123            })?;
124
125        let when_to_run = proto
126            .when_to_run
127            .and_then(|ms| DateTime::from_timestamp_millis(ms as i64));
128
129        Ok(ScheduleEntry {
130            collection_id,
131            task_id,
132            task_run_nonce,
133            when_to_run,
134        })
135    }
136}
137
138#[derive(Debug, thiserror::Error)]
139pub enum ScheduleEntryConversionError {
140    #[error("Missing required field: {0}")]
141    MissingField(String),
142    #[error("Invalid UUID for field: {0}")]
143    InvalidUuid(String),
144}