1use super::{AttachedFunctionUuid, CollectionUuid, ConversionError, Schema};
2use crate::{
3 chroma_proto::{self, FilePaths, FlushSegmentCompactionInfo},
4 SegmentUuid,
5};
6use chroma_error::{ChromaError, ErrorCodes};
7use std::{collections::HashMap, sync::Arc};
8use thiserror::Error;
9use uuid::Uuid;
10
11#[derive(Debug, Clone)]
12pub struct SegmentFlushInfo {
13 pub segment_id: SegmentUuid,
14 pub file_paths: HashMap<String, Vec<String>>,
15}
16
17#[derive(Debug, Clone)]
18pub struct CollectionFlushInfo {
19 pub tenant_id: String,
20 pub collection_id: CollectionUuid,
21 pub log_position: i64,
22 pub collection_version: i32,
23 pub segment_flush_info: Arc<[SegmentFlushInfo]>,
24 pub total_records_post_compaction: u64,
25 pub size_bytes_post_compaction: u64,
26 pub schema: Option<Schema>,
27}
28
29#[derive(Debug, Clone)]
30pub struct AttachedFunctionUpdateInfo {
31 pub attached_function_id: AttachedFunctionUuid,
32 pub completion_offset: u64,
33}
34
35#[derive(Error, Debug)]
36pub enum FinishAttachedFunctionError {
37 #[error("Failed to finish attached function: {0}")]
38 FailedToFinishAttachedFunction(#[from] tonic::Status),
39 #[error("Attached function not found")]
40 AttachedFunctionNotFound,
41}
42
43impl ChromaError for FinishAttachedFunctionError {
44 fn code(&self) -> ErrorCodes {
45 match self {
46 FinishAttachedFunctionError::FailedToFinishAttachedFunction(_) => ErrorCodes::Internal,
47 FinishAttachedFunctionError::AttachedFunctionNotFound => ErrorCodes::NotFound,
48 }
49 }
50}
51
52#[derive(Error, Debug)]
53pub enum FinishCreateAttachedFunctionError {
54 #[error("Failed to finish creating attached function: {0}")]
55 FailedToFinishCreateAttachedFunction(#[from] tonic::Status),
56 #[error("Attached function not found")]
57 AttachedFunctionNotFound,
58 #[error("Output collection already exists")]
59 OutputCollectionExists,
60}
61
62impl ChromaError for FinishCreateAttachedFunctionError {
63 fn code(&self) -> ErrorCodes {
64 match self {
65 FinishCreateAttachedFunctionError::FailedToFinishCreateAttachedFunction(_) => {
66 ErrorCodes::Internal
67 }
68 FinishCreateAttachedFunctionError::AttachedFunctionNotFound => ErrorCodes::NotFound,
69 FinishCreateAttachedFunctionError::OutputCollectionExists => ErrorCodes::AlreadyExists,
70 }
71 }
72}
73
74#[derive(Error, Debug)]
75pub enum GetMinCompletionOffsetError {
76 #[error("Failed to get min completion offset: {0}")]
77 FailedToGetMinCompletionOffset(#[from] tonic::Status),
78}
79
80impl ChromaError for GetMinCompletionOffsetError {
81 fn code(&self) -> ErrorCodes {
82 ErrorCodes::Internal
83 }
84}
85
86#[derive(Error, Debug)]
87pub enum AdvanceAttachedFunctionError {
88 #[error("Failed to advance attached function: {0}")]
89 FailedToAdvanceAttachedFunction(#[from] tonic::Status),
90 #[error("Attached function not found - nonce mismatch or attached function doesn't exist")]
91 AttachedFunctionNotFound,
92}
93
94impl ChromaError for AdvanceAttachedFunctionError {
95 fn code(&self) -> ErrorCodes {
96 match self {
97 AdvanceAttachedFunctionError::FailedToAdvanceAttachedFunction(_) => {
98 ErrorCodes::Internal
99 }
100 AdvanceAttachedFunctionError::AttachedFunctionNotFound => ErrorCodes::NotFound,
101 }
102 }
103}
104
105#[derive(Debug, Clone)]
106pub struct AdvanceAttachedFunctionResponse {
107 pub completion_offset: u64,
108}
109
110impl TryInto<FlushSegmentCompactionInfo> for &SegmentFlushInfo {
111 type Error = SegmentFlushInfoConversionError;
112
113 fn try_into(self) -> Result<FlushSegmentCompactionInfo, Self::Error> {
114 let mut file_paths = HashMap::new();
115 for (key, value) in self.file_paths.clone() {
116 file_paths.insert(key, FilePaths { paths: value });
117 }
118
119 Ok(FlushSegmentCompactionInfo {
120 segment_id: self.segment_id.to_string(),
121 file_paths,
122 })
123 }
124}
125
126#[derive(Error, Debug)]
127pub enum SegmentFlushInfoConversionError {
128 #[error("Invalid segment id, valid UUID required")]
129 InvalidSegmentId,
130 #[error(transparent)]
131 DecodeError(#[from] ConversionError),
132}
133
134#[derive(Error, Debug)]
135pub enum CollectionFlushInfoConversionError {
136 #[error("Failed to convert segment flush info: {0}")]
137 SegmentConversionError(#[from] SegmentFlushInfoConversionError),
138 #[error("Failed to serialize schema")]
139 SchemaSerializationError,
140}
141
142impl TryFrom<CollectionFlushInfo> for chroma_proto::FlushCollectionCompactionRequest {
143 type Error = CollectionFlushInfoConversionError;
144
145 fn try_from(collection: CollectionFlushInfo) -> Result<Self, Self::Error> {
146 let segment_compaction_info = collection
147 .segment_flush_info
148 .iter()
149 .map(|segment_flush_info| segment_flush_info.try_into())
150 .collect::<Result<Vec<_>, _>>()?;
151
152 let schema_str = collection
153 .schema
154 .map(|s| {
155 serde_json::to_string(&s)
156 .map_err(|_| CollectionFlushInfoConversionError::SchemaSerializationError)
157 })
158 .transpose()?;
159
160 Ok(crate::chroma_proto::FlushCollectionCompactionRequest {
161 tenant_id: collection.tenant_id,
162 collection_id: collection.collection_id.0.to_string(),
163 log_position: collection.log_position,
164 collection_version: collection.collection_version,
165 segment_compaction_info,
166 total_records_post_compaction: collection.total_records_post_compaction,
167 size_bytes_post_compaction: collection.size_bytes_post_compaction,
168 schema_str,
169 })
170 }
171}
172
173#[derive(Debug)]
174pub struct FlushCompactionResponse {
175 pub collection_id: CollectionUuid,
176 pub collection_version: i32,
177 pub last_compaction_time: i64,
178}
179
180#[derive(Debug)]
181pub struct FlushCompactionAndAttachedFunctionResponse {
182 pub collections: Vec<FlushCompactionResponse>,
183 pub completion_offset: u64,
185}
186
187impl FlushCompactionResponse {
188 pub fn new(
189 collection_id: CollectionUuid,
190 collection_version: i32,
191 last_compaction_time: i64,
192 ) -> Self {
193 FlushCompactionResponse {
194 collection_id,
195 collection_version,
196 last_compaction_time,
197 }
198 }
199}
200
201impl TryFrom<chroma_proto::FlushCollectionCompactionAndAttachedFunctionResponse>
202 for FlushCompactionAndAttachedFunctionResponse
203{
204 type Error = FlushCompactionResponseConversionError;
205
206 fn try_from(
207 value: chroma_proto::FlushCollectionCompactionAndAttachedFunctionResponse,
208 ) -> Result<Self, Self::Error> {
209 let mut collections = Vec::with_capacity(value.collections.len());
211 for collection in value.collections {
212 let id = Uuid::parse_str(&collection.collection_id)
213 .map_err(|_| FlushCompactionResponseConversionError::InvalidUuid)?;
214 collections.push(FlushCompactionResponse {
215 collection_id: CollectionUuid(id),
216 collection_version: collection.collection_version,
217 last_compaction_time: collection.last_compaction_time,
218 });
219 }
220
221 let completion_offset = value
225 .attached_function_state
226 .as_ref()
227 .map(|state| state.completion_offset)
228 .unwrap_or(0);
229
230 Ok(FlushCompactionAndAttachedFunctionResponse {
231 collections,
232 completion_offset,
233 })
234 }
235}
236
237#[derive(Error, Debug)]
238pub enum FlushCompactionResponseConversionError {
239 #[error(transparent)]
240 DecodeError(#[from] ConversionError),
241 #[error("Invalid collection id, valid UUID required")]
242 InvalidUuid,
243 #[error("Invalid attached function nonce, valid UUID required")]
244 InvalidAttachedFunctionNonce,
245 #[error("Invalid timestamp format")]
246 InvalidTimestamp,
247 #[error("Missing collections in response")]
248 MissingCollections,
249}
250
251impl ChromaError for FlushCompactionResponseConversionError {
252 fn code(&self) -> ErrorCodes {
253 match self {
254 FlushCompactionResponseConversionError::InvalidUuid => ErrorCodes::InvalidArgument,
255 FlushCompactionResponseConversionError::InvalidAttachedFunctionNonce => {
256 ErrorCodes::InvalidArgument
257 }
258 FlushCompactionResponseConversionError::InvalidTimestamp => ErrorCodes::InvalidArgument,
259 FlushCompactionResponseConversionError::MissingCollections => {
260 ErrorCodes::InvalidArgument
261 }
262 FlushCompactionResponseConversionError::DecodeError(e) => e.code(),
263 }
264 }
265}