chroma_types/
flush.rs

1use super::{AttachedFunctionUuid, CollectionUuid, ConversionError};
2use crate::{
3    chroma_proto::{
4        FilePaths, FlushCollectionCompactionAndAttachedFunctionResponse, FlushSegmentCompactionInfo,
5    },
6    SegmentUuid,
7};
8use chroma_error::{ChromaError, ErrorCodes};
9use std::collections::HashMap;
10use thiserror::Error;
11use uuid::Uuid;
12
13#[derive(Debug, Clone)]
14pub struct SegmentFlushInfo {
15    pub segment_id: SegmentUuid,
16    pub file_paths: HashMap<String, Vec<String>>,
17}
18
19#[derive(Debug, Clone)]
20pub struct AttachedFunctionUpdateInfo {
21    pub attached_function_id: AttachedFunctionUuid,
22    pub attached_function_run_nonce: uuid::Uuid,
23    pub completion_offset: u64,
24}
25
26#[derive(Error, Debug)]
27pub enum FinishAttachedFunctionError {
28    #[error("Failed to finish attached function: {0}")]
29    FailedToFinishAttachedFunction(#[from] tonic::Status),
30    #[error("Attached function not found")]
31    AttachedFunctionNotFound,
32}
33
34impl ChromaError for FinishAttachedFunctionError {
35    fn code(&self) -> ErrorCodes {
36        match self {
37            FinishAttachedFunctionError::FailedToFinishAttachedFunction(_) => ErrorCodes::Internal,
38            FinishAttachedFunctionError::AttachedFunctionNotFound => ErrorCodes::NotFound,
39        }
40    }
41}
42
43#[derive(Error, Debug)]
44pub enum GetMinCompletionOffsetError {
45    #[error("Failed to get min completion offset: {0}")]
46    FailedToGetMinCompletionOffset(#[from] tonic::Status),
47}
48
49impl ChromaError for GetMinCompletionOffsetError {
50    fn code(&self) -> ErrorCodes {
51        ErrorCodes::Internal
52    }
53}
54
55#[derive(Error, Debug)]
56pub enum AdvanceAttachedFunctionError {
57    #[error("Failed to advance attached function: {0}")]
58    FailedToAdvanceAttachedFunction(#[from] tonic::Status),
59    #[error("Attached function not found - nonce mismatch or attached function doesn't exist")]
60    AttachedFunctionNotFound,
61}
62
63impl ChromaError for AdvanceAttachedFunctionError {
64    fn code(&self) -> ErrorCodes {
65        match self {
66            AdvanceAttachedFunctionError::FailedToAdvanceAttachedFunction(_) => {
67                ErrorCodes::Internal
68            }
69            AdvanceAttachedFunctionError::AttachedFunctionNotFound => ErrorCodes::NotFound,
70        }
71    }
72}
73
74#[derive(Debug, Clone)]
75pub struct AdvanceAttachedFunctionResponse {
76    pub next_nonce: uuid::Uuid,
77    pub next_run: std::time::SystemTime,
78    pub completion_offset: u64,
79}
80
81impl TryInto<FlushSegmentCompactionInfo> for &SegmentFlushInfo {
82    type Error = SegmentFlushInfoConversionError;
83
84    fn try_into(self) -> Result<FlushSegmentCompactionInfo, Self::Error> {
85        let mut file_paths = HashMap::new();
86        for (key, value) in self.file_paths.clone() {
87            file_paths.insert(key, FilePaths { paths: value });
88        }
89
90        Ok(FlushSegmentCompactionInfo {
91            segment_id: self.segment_id.to_string(),
92            file_paths,
93        })
94    }
95}
96
97#[derive(Error, Debug)]
98pub enum SegmentFlushInfoConversionError {
99    #[error("Invalid segment id, valid UUID required")]
100    InvalidSegmentId,
101    #[error(transparent)]
102    DecodeError(#[from] ConversionError),
103}
104
105#[derive(Debug)]
106pub struct FlushCompactionResponse {
107    pub collection_id: CollectionUuid,
108    pub collection_version: i32,
109    pub last_compaction_time: i64,
110}
111
112#[derive(Debug)]
113pub struct FlushCompactionAndAttachedFunctionResponse {
114    pub collection_id: CollectionUuid,
115    pub collection_version: i32,
116    pub last_compaction_time: i64,
117    // Completion offset updated during register
118    pub completion_offset: u64,
119    // NOTE: next_nonce and next_run are no longer returned
120    // They were already set by PrepareAttachedFunction via advance_attached_function()
121}
122
123impl FlushCompactionResponse {
124    pub fn new(
125        collection_id: CollectionUuid,
126        collection_version: i32,
127        last_compaction_time: i64,
128    ) -> Self {
129        FlushCompactionResponse {
130            collection_id,
131            collection_version,
132            last_compaction_time,
133        }
134    }
135}
136
137impl TryFrom<FlushCollectionCompactionAndAttachedFunctionResponse> for FlushCompactionResponse {
138    type Error = FlushCompactionResponseConversionError;
139
140    fn try_from(
141        value: FlushCollectionCompactionAndAttachedFunctionResponse,
142    ) -> Result<Self, Self::Error> {
143        let id = Uuid::parse_str(&value.collection_id)
144            .map_err(|_| FlushCompactionResponseConversionError::InvalidUuid)?;
145        Ok(FlushCompactionResponse {
146            collection_id: CollectionUuid(id),
147            collection_version: value.collection_version,
148            last_compaction_time: value.last_compaction_time,
149        })
150    }
151}
152
153impl TryFrom<FlushCollectionCompactionAndAttachedFunctionResponse>
154    for FlushCompactionAndAttachedFunctionResponse
155{
156    type Error = FlushCompactionResponseConversionError;
157
158    fn try_from(
159        value: FlushCollectionCompactionAndAttachedFunctionResponse,
160    ) -> Result<Self, Self::Error> {
161        let id = Uuid::parse_str(&value.collection_id)
162            .map_err(|_| FlushCompactionResponseConversionError::InvalidUuid)?;
163
164        // Note: next_nonce and next_run are no longer populated by the server
165        // They were already set by PrepareAttachedFunction via advance_attached_function()
166        // We only use completion_offset from the response
167
168        Ok(FlushCompactionAndAttachedFunctionResponse {
169            collection_id: CollectionUuid(id),
170            collection_version: value.collection_version,
171            last_compaction_time: value.last_compaction_time,
172            completion_offset: value.completion_offset,
173        })
174    }
175}
176
177#[derive(Error, Debug)]
178pub enum FlushCompactionResponseConversionError {
179    #[error(transparent)]
180    DecodeError(#[from] ConversionError),
181    #[error("Invalid collection id, valid UUID required")]
182    InvalidUuid,
183    #[error("Invalid attached function nonce, valid UUID required")]
184    InvalidAttachedFunctionNonce,
185    #[error("Missing next_run timestamp")]
186    MissingNextRun,
187    #[error("Invalid timestamp format")]
188    InvalidTimestamp,
189}
190
191impl ChromaError for FlushCompactionResponseConversionError {
192    fn code(&self) -> ErrorCodes {
193        match self {
194            FlushCompactionResponseConversionError::InvalidUuid => ErrorCodes::InvalidArgument,
195            FlushCompactionResponseConversionError::InvalidAttachedFunctionNonce => {
196                ErrorCodes::InvalidArgument
197            }
198            FlushCompactionResponseConversionError::MissingNextRun => ErrorCodes::InvalidArgument,
199            FlushCompactionResponseConversionError::InvalidTimestamp => ErrorCodes::InvalidArgument,
200            FlushCompactionResponseConversionError::DecodeError(e) => e.code(),
201        }
202    }
203}