1use super::{AttachedFunctionUuid, CollectionUuid, ConversionError, DatabaseName, Schema};
2use crate::{
3 chroma_proto::{self, FilePaths, FlushSegmentCompactionInfo},
4 SegmentUuid,
5};
6use chroma_error::{ChromaError, ErrorCodes};
7use std::{collections::HashMap, sync::Arc};
8use thiserror::Error;
9use uuid::Uuid;
10
11#[derive(Debug, Clone)]
12pub struct SegmentFlushInfo {
13 pub segment_id: SegmentUuid,
14 pub file_paths: HashMap<String, Vec<String>>,
15}
16
17#[derive(Debug, Clone)]
18pub struct CollectionFlushInfo {
19 pub tenant_id: String,
20 pub database_name: DatabaseName,
21 pub collection_id: CollectionUuid,
22 pub log_position: i64,
23 pub collection_version: i32,
24 pub segment_flush_info: Arc<[SegmentFlushInfo]>,
25 pub total_records_post_compaction: u64,
26 pub size_bytes_post_compaction: u64,
27 pub schema: Option<Schema>,
28}
29
30#[derive(Debug, Clone)]
31pub struct AttachedFunctionUpdateInfo {
32 pub attached_function_id: AttachedFunctionUuid,
33 pub completion_offset: u64,
34}
35
36#[derive(Error, Debug)]
37pub enum FinishAttachedFunctionError {
38 #[error("Failed to finish attached function: {0}")]
39 FailedToFinishAttachedFunction(#[from] tonic::Status),
40 #[error("Attached function not found")]
41 AttachedFunctionNotFound,
42}
43
44impl ChromaError for FinishAttachedFunctionError {
45 fn code(&self) -> ErrorCodes {
46 match self {
47 FinishAttachedFunctionError::FailedToFinishAttachedFunction(_) => ErrorCodes::Internal,
48 FinishAttachedFunctionError::AttachedFunctionNotFound => ErrorCodes::NotFound,
49 }
50 }
51}
52
53#[derive(Error, Debug)]
54pub enum FinishCreateAttachedFunctionError {
55 #[error("Failed to finish creating attached function: {0}")]
56 FailedToFinishCreateAttachedFunction(#[from] tonic::Status),
57 #[error("Attached function not found")]
58 AttachedFunctionNotFound,
59 #[error("Output collection already exists")]
60 OutputCollectionExists,
61}
62
63impl ChromaError for FinishCreateAttachedFunctionError {
64 fn code(&self) -> ErrorCodes {
65 match self {
66 FinishCreateAttachedFunctionError::FailedToFinishCreateAttachedFunction(_) => {
67 ErrorCodes::Internal
68 }
69 FinishCreateAttachedFunctionError::AttachedFunctionNotFound => ErrorCodes::NotFound,
70 FinishCreateAttachedFunctionError::OutputCollectionExists => ErrorCodes::AlreadyExists,
71 }
72 }
73}
74
75#[derive(Error, Debug)]
76pub enum GetMinCompletionOffsetError {
77 #[error("Failed to get min completion offset: {0}")]
78 FailedToGetMinCompletionOffset(#[from] tonic::Status),
79}
80
81impl ChromaError for GetMinCompletionOffsetError {
82 fn code(&self) -> ErrorCodes {
83 ErrorCodes::Internal
84 }
85}
86
87#[derive(Error, Debug)]
88pub enum AdvanceAttachedFunctionError {
89 #[error("Failed to advance attached function: {0}")]
90 FailedToAdvanceAttachedFunction(#[from] tonic::Status),
91 #[error("Attached function not found - nonce mismatch or attached function doesn't exist")]
92 AttachedFunctionNotFound,
93}
94
95impl ChromaError for AdvanceAttachedFunctionError {
96 fn code(&self) -> ErrorCodes {
97 match self {
98 AdvanceAttachedFunctionError::FailedToAdvanceAttachedFunction(_) => {
99 ErrorCodes::Internal
100 }
101 AdvanceAttachedFunctionError::AttachedFunctionNotFound => ErrorCodes::NotFound,
102 }
103 }
104}
105
106#[derive(Debug, Clone)]
107pub struct AdvanceAttachedFunctionResponse {
108 pub completion_offset: u64,
109}
110
111impl TryInto<FlushSegmentCompactionInfo> for &SegmentFlushInfo {
112 type Error = SegmentFlushInfoConversionError;
113
114 fn try_into(self) -> Result<FlushSegmentCompactionInfo, Self::Error> {
115 let mut file_paths = HashMap::new();
116 for (key, value) in self.file_paths.clone() {
117 file_paths.insert(key, FilePaths { paths: value });
118 }
119
120 Ok(FlushSegmentCompactionInfo {
121 segment_id: self.segment_id.to_string(),
122 file_paths,
123 })
124 }
125}
126
127#[derive(Error, Debug)]
128pub enum SegmentFlushInfoConversionError {
129 #[error("Invalid segment id, valid UUID required")]
130 InvalidSegmentId,
131 #[error(transparent)]
132 DecodeError(#[from] ConversionError),
133}
134
135#[derive(Error, Debug)]
136pub enum CollectionFlushInfoConversionError {
137 #[error("Failed to convert segment flush info: {0}")]
138 SegmentConversionError(#[from] SegmentFlushInfoConversionError),
139 #[error("Failed to serialize schema")]
140 SchemaSerializationError,
141}
142
143impl TryFrom<CollectionFlushInfo> for chroma_proto::FlushCollectionCompactionRequest {
144 type Error = CollectionFlushInfoConversionError;
145
146 fn try_from(collection: CollectionFlushInfo) -> Result<Self, Self::Error> {
147 let segment_compaction_info = collection
148 .segment_flush_info
149 .iter()
150 .map(|segment_flush_info| segment_flush_info.try_into())
151 .collect::<Result<Vec<_>, _>>()?;
152
153 let schema_str = collection
154 .schema
155 .map(|s| {
156 serde_json::to_string(&s)
157 .map_err(|_| CollectionFlushInfoConversionError::SchemaSerializationError)
158 })
159 .transpose()?;
160
161 Ok(crate::chroma_proto::FlushCollectionCompactionRequest {
162 tenant_id: collection.tenant_id,
163 collection_id: collection.collection_id.0.to_string(),
164 log_position: collection.log_position,
165 collection_version: collection.collection_version,
166 segment_compaction_info,
167 total_records_post_compaction: collection.total_records_post_compaction,
168 size_bytes_post_compaction: collection.size_bytes_post_compaction,
169 schema_str,
170 database_name: Some(collection.database_name.as_ref().to_string()),
171 })
172 }
173}
174
175#[derive(Debug)]
176pub struct FlushCompactionResponse {
177 pub collection_id: CollectionUuid,
178 pub collection_version: i32,
179 pub last_compaction_time: i64,
180}
181
182#[derive(Debug)]
183pub struct FlushCompactionAndAttachedFunctionResponse {
184 pub collections: Vec<FlushCompactionResponse>,
185 pub completion_offset: u64,
187}
188
189impl FlushCompactionResponse {
190 pub fn new(
191 collection_id: CollectionUuid,
192 collection_version: i32,
193 last_compaction_time: i64,
194 ) -> Self {
195 FlushCompactionResponse {
196 collection_id,
197 collection_version,
198 last_compaction_time,
199 }
200 }
201}
202
203impl TryFrom<chroma_proto::FlushCollectionCompactionAndAttachedFunctionResponse>
204 for FlushCompactionAndAttachedFunctionResponse
205{
206 type Error = FlushCompactionResponseConversionError;
207
208 fn try_from(
209 value: chroma_proto::FlushCollectionCompactionAndAttachedFunctionResponse,
210 ) -> Result<Self, Self::Error> {
211 let mut collections = Vec::with_capacity(value.collections.len());
213 for collection in value.collections {
214 let id = Uuid::parse_str(&collection.collection_id)
215 .map_err(|_| FlushCompactionResponseConversionError::InvalidUuid)?;
216 collections.push(FlushCompactionResponse {
217 collection_id: CollectionUuid(id),
218 collection_version: collection.collection_version,
219 last_compaction_time: collection.last_compaction_time,
220 });
221 }
222
223 let completion_offset = value
227 .attached_function_state
228 .as_ref()
229 .map(|state| state.completion_offset)
230 .unwrap_or(0);
231
232 Ok(FlushCompactionAndAttachedFunctionResponse {
233 collections,
234 completion_offset,
235 })
236 }
237}
238
239#[derive(Error, Debug)]
240pub enum FlushCompactionResponseConversionError {
241 #[error(transparent)]
242 DecodeError(#[from] ConversionError),
243 #[error("Invalid collection id, valid UUID required")]
244 InvalidUuid,
245 #[error("Invalid attached function nonce, valid UUID required")]
246 InvalidAttachedFunctionNonce,
247 #[error("Invalid timestamp format")]
248 InvalidTimestamp,
249 #[error("Missing collections in response")]
250 MissingCollections,
251}
252
253impl ChromaError for FlushCompactionResponseConversionError {
254 fn code(&self) -> ErrorCodes {
255 match self {
256 FlushCompactionResponseConversionError::InvalidUuid => ErrorCodes::InvalidArgument,
257 FlushCompactionResponseConversionError::InvalidAttachedFunctionNonce => {
258 ErrorCodes::InvalidArgument
259 }
260 FlushCompactionResponseConversionError::InvalidTimestamp => ErrorCodes::InvalidArgument,
261 FlushCompactionResponseConversionError::MissingCollections => {
262 ErrorCodes::InvalidArgument
263 }
264 FlushCompactionResponseConversionError::DecodeError(e) => e.code(),
265 }
266 }
267}