chroma_types/
flush.rs

1use super::{AttachedFunctionUuid, CollectionUuid, ConversionError, Schema};
2use crate::{
3    chroma_proto::{self, FilePaths, FlushSegmentCompactionInfo},
4    SegmentUuid,
5};
6use chroma_error::{ChromaError, ErrorCodes};
7use std::{collections::HashMap, sync::Arc};
8use thiserror::Error;
9use uuid::Uuid;
10
11#[derive(Debug, Clone)]
12pub struct SegmentFlushInfo {
13    pub segment_id: SegmentUuid,
14    pub file_paths: HashMap<String, Vec<String>>,
15}
16
17#[derive(Debug, Clone)]
18pub struct CollectionFlushInfo {
19    pub tenant_id: String,
20    pub collection_id: CollectionUuid,
21    pub log_position: i64,
22    pub collection_version: i32,
23    pub segment_flush_info: Arc<[SegmentFlushInfo]>,
24    pub total_records_post_compaction: u64,
25    pub size_bytes_post_compaction: u64,
26    pub schema: Option<Schema>,
27}
28
29#[derive(Debug, Clone)]
30pub struct AttachedFunctionUpdateInfo {
31    pub attached_function_id: AttachedFunctionUuid,
32    pub completion_offset: u64,
33}
34
35#[derive(Error, Debug)]
36pub enum FinishAttachedFunctionError {
37    #[error("Failed to finish attached function: {0}")]
38    FailedToFinishAttachedFunction(#[from] tonic::Status),
39    #[error("Attached function not found")]
40    AttachedFunctionNotFound,
41}
42
43impl ChromaError for FinishAttachedFunctionError {
44    fn code(&self) -> ErrorCodes {
45        match self {
46            FinishAttachedFunctionError::FailedToFinishAttachedFunction(_) => ErrorCodes::Internal,
47            FinishAttachedFunctionError::AttachedFunctionNotFound => ErrorCodes::NotFound,
48        }
49    }
50}
51
52#[derive(Error, Debug)]
53pub enum FinishCreateAttachedFunctionError {
54    #[error("Failed to finish creating attached function: {0}")]
55    FailedToFinishCreateAttachedFunction(#[from] tonic::Status),
56    #[error("Attached function not found")]
57    AttachedFunctionNotFound,
58    #[error("Output collection already exists")]
59    OutputCollectionExists,
60}
61
62impl ChromaError for FinishCreateAttachedFunctionError {
63    fn code(&self) -> ErrorCodes {
64        match self {
65            FinishCreateAttachedFunctionError::FailedToFinishCreateAttachedFunction(_) => {
66                ErrorCodes::Internal
67            }
68            FinishCreateAttachedFunctionError::AttachedFunctionNotFound => ErrorCodes::NotFound,
69            FinishCreateAttachedFunctionError::OutputCollectionExists => ErrorCodes::AlreadyExists,
70        }
71    }
72}
73
74#[derive(Error, Debug)]
75pub enum GetMinCompletionOffsetError {
76    #[error("Failed to get min completion offset: {0}")]
77    FailedToGetMinCompletionOffset(#[from] tonic::Status),
78}
79
80impl ChromaError for GetMinCompletionOffsetError {
81    fn code(&self) -> ErrorCodes {
82        ErrorCodes::Internal
83    }
84}
85
86#[derive(Error, Debug)]
87pub enum AdvanceAttachedFunctionError {
88    #[error("Failed to advance attached function: {0}")]
89    FailedToAdvanceAttachedFunction(#[from] tonic::Status),
90    #[error("Attached function not found - nonce mismatch or attached function doesn't exist")]
91    AttachedFunctionNotFound,
92}
93
94impl ChromaError for AdvanceAttachedFunctionError {
95    fn code(&self) -> ErrorCodes {
96        match self {
97            AdvanceAttachedFunctionError::FailedToAdvanceAttachedFunction(_) => {
98                ErrorCodes::Internal
99            }
100            AdvanceAttachedFunctionError::AttachedFunctionNotFound => ErrorCodes::NotFound,
101        }
102    }
103}
104
105#[derive(Debug, Clone)]
106pub struct AdvanceAttachedFunctionResponse {
107    pub completion_offset: u64,
108}
109
110impl TryInto<FlushSegmentCompactionInfo> for &SegmentFlushInfo {
111    type Error = SegmentFlushInfoConversionError;
112
113    fn try_into(self) -> Result<FlushSegmentCompactionInfo, Self::Error> {
114        let mut file_paths = HashMap::new();
115        for (key, value) in self.file_paths.clone() {
116            file_paths.insert(key, FilePaths { paths: value });
117        }
118
119        Ok(FlushSegmentCompactionInfo {
120            segment_id: self.segment_id.to_string(),
121            file_paths,
122        })
123    }
124}
125
126#[derive(Error, Debug)]
127pub enum SegmentFlushInfoConversionError {
128    #[error("Invalid segment id, valid UUID required")]
129    InvalidSegmentId,
130    #[error(transparent)]
131    DecodeError(#[from] ConversionError),
132}
133
134#[derive(Error, Debug)]
135pub enum CollectionFlushInfoConversionError {
136    #[error("Failed to convert segment flush info: {0}")]
137    SegmentConversionError(#[from] SegmentFlushInfoConversionError),
138    #[error("Failed to serialize schema")]
139    SchemaSerializationError,
140}
141
142impl TryFrom<CollectionFlushInfo> for chroma_proto::FlushCollectionCompactionRequest {
143    type Error = CollectionFlushInfoConversionError;
144
145    fn try_from(collection: CollectionFlushInfo) -> Result<Self, Self::Error> {
146        let segment_compaction_info = collection
147            .segment_flush_info
148            .iter()
149            .map(|segment_flush_info| segment_flush_info.try_into())
150            .collect::<Result<Vec<_>, _>>()?;
151
152        let schema_str = collection
153            .schema
154            .map(|s| {
155                serde_json::to_string(&s)
156                    .map_err(|_| CollectionFlushInfoConversionError::SchemaSerializationError)
157            })
158            .transpose()?;
159
160        Ok(crate::chroma_proto::FlushCollectionCompactionRequest {
161            tenant_id: collection.tenant_id,
162            collection_id: collection.collection_id.0.to_string(),
163            log_position: collection.log_position,
164            collection_version: collection.collection_version,
165            segment_compaction_info,
166            total_records_post_compaction: collection.total_records_post_compaction,
167            size_bytes_post_compaction: collection.size_bytes_post_compaction,
168            schema_str,
169        })
170    }
171}
172
173#[derive(Debug)]
174pub struct FlushCompactionResponse {
175    pub collection_id: CollectionUuid,
176    pub collection_version: i32,
177    pub last_compaction_time: i64,
178}
179
180#[derive(Debug)]
181pub struct FlushCompactionAndAttachedFunctionResponse {
182    pub collections: Vec<FlushCompactionResponse>,
183    // Completion offset updated during register
184    pub completion_offset: u64,
185}
186
187impl FlushCompactionResponse {
188    pub fn new(
189        collection_id: CollectionUuid,
190        collection_version: i32,
191        last_compaction_time: i64,
192    ) -> Self {
193        FlushCompactionResponse {
194            collection_id,
195            collection_version,
196            last_compaction_time,
197        }
198    }
199}
200
201impl TryFrom<chroma_proto::FlushCollectionCompactionAndAttachedFunctionResponse>
202    for FlushCompactionAndAttachedFunctionResponse
203{
204    type Error = FlushCompactionResponseConversionError;
205
206    fn try_from(
207        value: chroma_proto::FlushCollectionCompactionAndAttachedFunctionResponse,
208    ) -> Result<Self, Self::Error> {
209        // Parse all collections from the repeated field
210        let mut collections = Vec::with_capacity(value.collections.len());
211        for collection in value.collections {
212            let id = Uuid::parse_str(&collection.collection_id)
213                .map_err(|_| FlushCompactionResponseConversionError::InvalidUuid)?;
214            collections.push(FlushCompactionResponse {
215                collection_id: CollectionUuid(id),
216                collection_version: collection.collection_version,
217                last_compaction_time: collection.last_compaction_time,
218            });
219        }
220
221        // Extract completion_offset from attached_function_state
222        // Note: next_nonce and next_run are no longer used by the client
223        // They were already set by PrepareAttachedFunction via advance_attached_function()
224        let completion_offset = value
225            .attached_function_state
226            .as_ref()
227            .map(|state| state.completion_offset)
228            .unwrap_or(0);
229
230        Ok(FlushCompactionAndAttachedFunctionResponse {
231            collections,
232            completion_offset,
233        })
234    }
235}
236
237#[derive(Error, Debug)]
238pub enum FlushCompactionResponseConversionError {
239    #[error(transparent)]
240    DecodeError(#[from] ConversionError),
241    #[error("Invalid collection id, valid UUID required")]
242    InvalidUuid,
243    #[error("Invalid attached function nonce, valid UUID required")]
244    InvalidAttachedFunctionNonce,
245    #[error("Invalid timestamp format")]
246    InvalidTimestamp,
247    #[error("Missing collections in response")]
248    MissingCollections,
249}
250
251impl ChromaError for FlushCompactionResponseConversionError {
252    fn code(&self) -> ErrorCodes {
253        match self {
254            FlushCompactionResponseConversionError::InvalidUuid => ErrorCodes::InvalidArgument,
255            FlushCompactionResponseConversionError::InvalidAttachedFunctionNonce => {
256                ErrorCodes::InvalidArgument
257            }
258            FlushCompactionResponseConversionError::InvalidTimestamp => ErrorCodes::InvalidArgument,
259            FlushCompactionResponseConversionError::MissingCollections => {
260                ErrorCodes::InvalidArgument
261            }
262            FlushCompactionResponseConversionError::DecodeError(e) => e.code(),
263        }
264    }
265}