Skip to main content

chroma_types/
flush.rs

1use super::{AttachedFunctionUuid, CollectionUuid, ConversionError, DatabaseName, Schema};
2use crate::{
3    chroma_proto::{self, FilePaths, FlushSegmentCompactionInfo},
4    SegmentUuid,
5};
6use chroma_error::{ChromaError, ErrorCodes};
7use std::{collections::HashMap, sync::Arc};
8use thiserror::Error;
9use uuid::Uuid;
10
11#[derive(Debug, Clone)]
12pub struct SegmentFlushInfo {
13    pub segment_id: SegmentUuid,
14    pub file_paths: HashMap<String, Vec<String>>,
15}
16
17#[derive(Debug, Clone)]
18pub struct CollectionFlushInfo {
19    pub tenant_id: String,
20    pub database_name: DatabaseName,
21    pub collection_id: CollectionUuid,
22    pub log_position: i64,
23    pub collection_version: i32,
24    pub segment_flush_info: Arc<[SegmentFlushInfo]>,
25    pub total_records_post_compaction: u64,
26    pub size_bytes_post_compaction: u64,
27    pub schema: Option<Schema>,
28}
29
30#[derive(Debug, Clone)]
31pub struct AttachedFunctionUpdateInfo {
32    pub attached_function_id: AttachedFunctionUuid,
33    pub completion_offset: u64,
34}
35
36#[derive(Error, Debug)]
37pub enum FinishAttachedFunctionError {
38    #[error("Failed to finish attached function: {0}")]
39    FailedToFinishAttachedFunction(#[from] tonic::Status),
40    #[error("Attached function not found")]
41    AttachedFunctionNotFound,
42}
43
44impl ChromaError for FinishAttachedFunctionError {
45    fn code(&self) -> ErrorCodes {
46        match self {
47            FinishAttachedFunctionError::FailedToFinishAttachedFunction(_) => ErrorCodes::Internal,
48            FinishAttachedFunctionError::AttachedFunctionNotFound => ErrorCodes::NotFound,
49        }
50    }
51}
52
53#[derive(Error, Debug)]
54pub enum FinishCreateAttachedFunctionError {
55    #[error("Failed to finish creating attached function: {0}")]
56    FailedToFinishCreateAttachedFunction(#[from] tonic::Status),
57    #[error("Attached function not found")]
58    AttachedFunctionNotFound,
59    #[error("Output collection already exists")]
60    OutputCollectionExists,
61}
62
63impl ChromaError for FinishCreateAttachedFunctionError {
64    fn code(&self) -> ErrorCodes {
65        match self {
66            FinishCreateAttachedFunctionError::FailedToFinishCreateAttachedFunction(_) => {
67                ErrorCodes::Internal
68            }
69            FinishCreateAttachedFunctionError::AttachedFunctionNotFound => ErrorCodes::NotFound,
70            FinishCreateAttachedFunctionError::OutputCollectionExists => ErrorCodes::AlreadyExists,
71        }
72    }
73}
74
75#[derive(Error, Debug)]
76pub enum GetMinCompletionOffsetError {
77    #[error("Failed to get min completion offset: {0}")]
78    FailedToGetMinCompletionOffset(#[from] tonic::Status),
79}
80
81impl ChromaError for GetMinCompletionOffsetError {
82    fn code(&self) -> ErrorCodes {
83        ErrorCodes::Internal
84    }
85}
86
87#[derive(Error, Debug)]
88pub enum AdvanceAttachedFunctionError {
89    #[error("Failed to advance attached function: {0}")]
90    FailedToAdvanceAttachedFunction(#[from] tonic::Status),
91    #[error("Attached function not found - nonce mismatch or attached function doesn't exist")]
92    AttachedFunctionNotFound,
93}
94
95impl ChromaError for AdvanceAttachedFunctionError {
96    fn code(&self) -> ErrorCodes {
97        match self {
98            AdvanceAttachedFunctionError::FailedToAdvanceAttachedFunction(_) => {
99                ErrorCodes::Internal
100            }
101            AdvanceAttachedFunctionError::AttachedFunctionNotFound => ErrorCodes::NotFound,
102        }
103    }
104}
105
106#[derive(Debug, Clone)]
107pub struct AdvanceAttachedFunctionResponse {
108    pub completion_offset: u64,
109}
110
111impl TryInto<FlushSegmentCompactionInfo> for &SegmentFlushInfo {
112    type Error = SegmentFlushInfoConversionError;
113
114    fn try_into(self) -> Result<FlushSegmentCompactionInfo, Self::Error> {
115        let mut file_paths = HashMap::new();
116        for (key, value) in self.file_paths.clone() {
117            file_paths.insert(key, FilePaths { paths: value });
118        }
119
120        Ok(FlushSegmentCompactionInfo {
121            segment_id: self.segment_id.to_string(),
122            file_paths,
123        })
124    }
125}
126
127#[derive(Error, Debug)]
128pub enum SegmentFlushInfoConversionError {
129    #[error("Invalid segment id, valid UUID required")]
130    InvalidSegmentId,
131    #[error(transparent)]
132    DecodeError(#[from] ConversionError),
133}
134
135#[derive(Error, Debug)]
136pub enum CollectionFlushInfoConversionError {
137    #[error("Failed to convert segment flush info: {0}")]
138    SegmentConversionError(#[from] SegmentFlushInfoConversionError),
139    #[error("Failed to serialize schema")]
140    SchemaSerializationError,
141}
142
143impl TryFrom<CollectionFlushInfo> for chroma_proto::FlushCollectionCompactionRequest {
144    type Error = CollectionFlushInfoConversionError;
145
146    fn try_from(collection: CollectionFlushInfo) -> Result<Self, Self::Error> {
147        let segment_compaction_info = collection
148            .segment_flush_info
149            .iter()
150            .map(|segment_flush_info| segment_flush_info.try_into())
151            .collect::<Result<Vec<_>, _>>()?;
152
153        let schema_str = collection
154            .schema
155            .map(|s| {
156                serde_json::to_string(&s)
157                    .map_err(|_| CollectionFlushInfoConversionError::SchemaSerializationError)
158            })
159            .transpose()?;
160
161        Ok(crate::chroma_proto::FlushCollectionCompactionRequest {
162            tenant_id: collection.tenant_id,
163            collection_id: collection.collection_id.0.to_string(),
164            log_position: collection.log_position,
165            collection_version: collection.collection_version,
166            segment_compaction_info,
167            total_records_post_compaction: collection.total_records_post_compaction,
168            size_bytes_post_compaction: collection.size_bytes_post_compaction,
169            schema_str,
170            database_name: Some(collection.database_name.as_ref().to_string()),
171        })
172    }
173}
174
175#[derive(Debug)]
176pub struct FlushCompactionResponse {
177    pub collection_id: CollectionUuid,
178    pub collection_version: i32,
179    pub last_compaction_time: i64,
180}
181
182#[derive(Debug)]
183pub struct FlushCompactionAndAttachedFunctionResponse {
184    pub collections: Vec<FlushCompactionResponse>,
185    // Completion offset updated during register
186    pub completion_offset: u64,
187}
188
189impl FlushCompactionResponse {
190    pub fn new(
191        collection_id: CollectionUuid,
192        collection_version: i32,
193        last_compaction_time: i64,
194    ) -> Self {
195        FlushCompactionResponse {
196            collection_id,
197            collection_version,
198            last_compaction_time,
199        }
200    }
201}
202
203impl TryFrom<chroma_proto::FlushCollectionCompactionAndAttachedFunctionResponse>
204    for FlushCompactionAndAttachedFunctionResponse
205{
206    type Error = FlushCompactionResponseConversionError;
207
208    fn try_from(
209        value: chroma_proto::FlushCollectionCompactionAndAttachedFunctionResponse,
210    ) -> Result<Self, Self::Error> {
211        // Parse all collections from the repeated field
212        let mut collections = Vec::with_capacity(value.collections.len());
213        for collection in value.collections {
214            let id = Uuid::parse_str(&collection.collection_id)
215                .map_err(|_| FlushCompactionResponseConversionError::InvalidUuid)?;
216            collections.push(FlushCompactionResponse {
217                collection_id: CollectionUuid(id),
218                collection_version: collection.collection_version,
219                last_compaction_time: collection.last_compaction_time,
220            });
221        }
222
223        // Extract completion_offset from attached_function_state
224        // Note: next_nonce and next_run are no longer used by the client
225        // They were already set by PrepareAttachedFunction via advance_attached_function()
226        let completion_offset = value
227            .attached_function_state
228            .as_ref()
229            .map(|state| state.completion_offset)
230            .unwrap_or(0);
231
232        Ok(FlushCompactionAndAttachedFunctionResponse {
233            collections,
234            completion_offset,
235        })
236    }
237}
238
239#[derive(Error, Debug)]
240pub enum FlushCompactionResponseConversionError {
241    #[error(transparent)]
242    DecodeError(#[from] ConversionError),
243    #[error("Invalid collection id, valid UUID required")]
244    InvalidUuid,
245    #[error("Invalid attached function nonce, valid UUID required")]
246    InvalidAttachedFunctionNonce,
247    #[error("Invalid timestamp format")]
248    InvalidTimestamp,
249    #[error("Missing collections in response")]
250    MissingCollections,
251}
252
253impl ChromaError for FlushCompactionResponseConversionError {
254    fn code(&self) -> ErrorCodes {
255        match self {
256            FlushCompactionResponseConversionError::InvalidUuid => ErrorCodes::InvalidArgument,
257            FlushCompactionResponseConversionError::InvalidAttachedFunctionNonce => {
258                ErrorCodes::InvalidArgument
259            }
260            FlushCompactionResponseConversionError::InvalidTimestamp => ErrorCodes::InvalidArgument,
261            FlushCompactionResponseConversionError::MissingCollections => {
262                ErrorCodes::InvalidArgument
263            }
264            FlushCompactionResponseConversionError::DecodeError(e) => e.code(),
265        }
266    }
267}