1use std::path::Path;
6use std::sync::Arc;
7
8use serde::{Deserialize, Serialize};
9
10use sochdb_storage::database::{Database, DatabaseConfig};
11
12use crate::config::EngineConfig;
13use crate::error::{Error, Result};
14use crate::types::*;
15
16pub struct Catalog {
18 db: Arc<Database>,
19}
20
21impl Catalog {
22 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
24 let config = DatabaseConfig {
25 group_commit: true,
26 ..Default::default()
27 };
28 let db =
29 Database::open_with_config(path, config).map_err(|e| Error::Storage(e.to_string()))?;
30
31 Ok(Self { db })
32 }
33
34 pub fn open_memory() -> Result<Self> {
36 let temp_dir = tempfile::tempdir().map_err(|e| Error::Storage(e.to_string()))?;
37 Self::open(temp_dir.path())
38 }
39
40 fn now_secs() -> i64 {
42 use std::time::{SystemTime, UNIX_EPOCH};
43 SystemTime::now()
44 .duration_since(UNIX_EPOCH)
45 .unwrap()
46 .as_secs() as i64
47 }
48
49 pub fn create_collection(&self, name: &str, config: &EngineConfig) -> Result<i64> {
51 let config_json =
52 serde_json::to_string(config).map_err(|e| Error::Serialization(e.to_string()))?;
53
54 let metric = match config.metric {
55 Metric::DotProduct => "dot_product",
56 Metric::Cosine => "cosine",
57 };
58
59 let txn = self
60 .db
61 .begin_transaction()
62 .map_err(|e| Error::Storage(e.to_string()))?;
63
64 let id = Self::now_secs();
66
67 let key = format!("collections/{}", name);
68 let value = serde_json::json!({
69 "id": id,
70 "name": name,
71 "dim": config.dim,
72 "metric": metric,
73 "config_json": config_json,
74 "created_at": Self::now_secs()
75 });
76
77 self.db
78 .put(txn, key.as_bytes(), value.to_string().as_bytes())
79 .map_err(|e| Error::Storage(e.to_string()))?;
80
81 self.db
82 .commit(txn)
83 .map_err(|e| Error::Storage(e.to_string()))?;
84
85 Ok(id)
86 }
87
88 pub fn get_collection(&self, name: &str) -> Result<CollectionInfo> {
90 let key = format!("collections/{}", name);
91
92 let txn = self
93 .db
94 .begin_transaction()
95 .map_err(|e| Error::Storage(e.to_string()))?;
96 let value = self
97 .db
98 .get(txn, key.as_bytes())
99 .map_err(|e| Error::Storage(e.to_string()))?
100 .ok_or_else(|| Error::CollectionNotFound(name.to_string()))?;
101 self.db
102 .commit(txn)
103 .map_err(|e| Error::Storage(e.to_string()))?;
104
105 let json: serde_json::Value =
106 serde_json::from_slice(&value).map_err(|e| Error::Serialization(e.to_string()))?;
107
108 Ok(CollectionInfo {
109 id: json["id"].as_i64().unwrap_or(0),
110 name: json["name"].as_str().unwrap_or("").to_string(),
111 dim: json["dim"].as_u64().unwrap_or(0) as u32,
112 metric: json["metric"].as_str().unwrap_or("dot_product").to_string(),
113 config_json: json["config_json"].as_str().unwrap_or("{}").to_string(),
114 })
115 }
116
117 pub fn list_collections(&self) -> Result<Vec<CollectionInfo>> {
119 let txn = self
120 .db
121 .begin_transaction()
122 .map_err(|e| Error::Storage(e.to_string()))?;
123
124 let prefix = b"collections/";
125 let entries = self
126 .db
127 .scan(txn, prefix)
128 .map_err(|e| Error::Storage(e.to_string()))?;
129
130 self.db
131 .commit(txn)
132 .map_err(|e| Error::Storage(e.to_string()))?;
133
134 let mut collections = Vec::new();
135 for (_key, value) in entries {
136 if let Ok(json) = serde_json::from_slice::<serde_json::Value>(&value) {
137 collections.push(CollectionInfo {
138 id: json["id"].as_i64().unwrap_or(0),
139 name: json["name"].as_str().unwrap_or("").to_string(),
140 dim: json["dim"].as_u64().unwrap_or(0) as u32,
141 metric: json["metric"].as_str().unwrap_or("dot_product").to_string(),
142 config_json: json["config_json"].as_str().unwrap_or("{}").to_string(),
143 });
144 }
145 }
146
147 Ok(collections)
148 }
149
150 pub fn add_segment(&self, collection_id: i64, segment: &SegmentInfo) -> Result<()> {
152 let txn = self
153 .db
154 .begin_transaction()
155 .map_err(|e| Error::Storage(e.to_string()))?;
156
157 let key = format!("segments/{}/{}", collection_id, segment.id);
158 let value = serde_json::json!({
159 "id": segment.id,
160 "collection_id": collection_id,
161 "path": segment.path,
162 "state": segment.state.to_string(),
163 "n_vec": segment.n_vec,
164 "min_vec_id": segment.min_vec_id,
165 "max_vec_id": segment.max_vec_id,
166 "created_at": Self::now_secs()
167 });
168
169 self.db
170 .put(txn, key.as_bytes(), value.to_string().as_bytes())
171 .map_err(|e| Error::Storage(e.to_string()))?;
172
173 self.db
174 .commit(txn)
175 .map_err(|e| Error::Storage(e.to_string()))?;
176
177 Ok(())
178 }
179
180 pub fn get_segments(&self, collection_id: i64) -> Result<Vec<SegmentInfo>> {
182 let txn = self
183 .db
184 .begin_transaction()
185 .map_err(|e| Error::Storage(e.to_string()))?;
186
187 let prefix = format!("segments/{}/", collection_id);
188 let entries = self
189 .db
190 .scan(txn, prefix.as_bytes())
191 .map_err(|e| Error::Storage(e.to_string()))?;
192
193 self.db
194 .commit(txn)
195 .map_err(|e| Error::Storage(e.to_string()))?;
196
197 let mut segments = Vec::new();
198 for (_key, value) in entries {
199 if let Ok(json) = serde_json::from_slice::<serde_json::Value>(&value) {
200 let state_str = json["state"].as_str().unwrap_or("sealed");
201 if state_str != "deleted" {
202 segments.push(SegmentInfo {
203 id: json["id"].as_u64().unwrap_or(0),
204 path: json["path"].as_str().unwrap_or("").to_string(),
205 state: SegmentState::from_str(state_str),
206 n_vec: json["n_vec"].as_u64().unwrap_or(0) as u32,
207 min_vec_id: json["min_vec_id"].as_u64().map(|v| v as u32),
208 max_vec_id: json["max_vec_id"].as_u64().map(|v| v as u32),
209 });
210 }
211 }
212 }
213
214 segments.sort_by(|a, b| b.id.cmp(&a.id));
216 Ok(segments)
217 }
218
219 pub fn update_segment_state(&self, segment_id: u64, state: SegmentState) -> Result<()> {
221 let txn = self
224 .db
225 .begin_transaction()
226 .map_err(|e| Error::Storage(e.to_string()))?;
227
228 let prefix = b"segments/";
229 let entries = self
230 .db
231 .scan(txn, prefix)
232 .map_err(|e| Error::Storage(e.to_string()))?;
233
234 let mut found = false;
236 for (key, value) in &entries {
237 if let Ok(mut json) = serde_json::from_slice::<serde_json::Value>(value) {
238 if json["id"].as_u64() == Some(segment_id) {
239 json["state"] = serde_json::Value::String(state.to_string().to_owned());
240
241 let txn2 = self
242 .db
243 .begin_transaction()
244 .map_err(|e| Error::Storage(e.to_string()))?;
245 self.db
246 .put(txn2, key, json.to_string().as_bytes())
247 .map_err(|e| Error::Storage(e.to_string()))?;
248 self.db
249 .commit(txn2)
250 .map_err(|e| Error::Storage(e.to_string()))?;
251
252 found = true;
253 break;
254 }
255 }
256 }
257
258 self.db
259 .commit(txn)
260 .map_err(|e| Error::Storage(e.to_string()))?;
261
262 if !found {
263 tracing::warn!("update_segment_state: segment {} not found", segment_id);
264 }
265
266 Ok(())
267 }
268
269 pub fn add_tombstone(&self, collection_id: i64, segment_id: u64, vec_id: u32) -> Result<()> {
271 let txn = self
272 .db
273 .begin_transaction()
274 .map_err(|e| Error::Storage(e.to_string()))?;
275
276 let key = format!("tombstones/{}/{}/{}", collection_id, segment_id, vec_id);
277 let value = serde_json::json!({
278 "collection_id": collection_id,
279 "segment_id": segment_id,
280 "vec_id": vec_id,
281 "created_at": Self::now_secs()
282 });
283
284 self.db
285 .put(txn, key.as_bytes(), value.to_string().as_bytes())
286 .map_err(|e| Error::Storage(e.to_string()))?;
287
288 self.db
289 .commit(txn)
290 .map_err(|e| Error::Storage(e.to_string()))?;
291
292 Ok(())
293 }
294
295 pub fn get_tombstones(&self, segment_id: u64) -> Result<Vec<u32>> {
297 let txn = self
298 .db
299 .begin_transaction()
300 .map_err(|e| Error::Storage(e.to_string()))?;
301
302 let prefix = b"tombstones/";
304 let entries = self
305 .db
306 .scan(txn, prefix)
307 .map_err(|e| Error::Storage(e.to_string()))?;
308
309 self.db
310 .commit(txn)
311 .map_err(|e| Error::Storage(e.to_string()))?;
312
313 let mut tombstones = Vec::new();
314 for (_key, value) in entries {
315 if let Ok(json) = serde_json::from_slice::<serde_json::Value>(&value) {
316 if json["segment_id"].as_u64() == Some(segment_id) {
317 if let Some(vec_id) = json["vec_id"].as_u64() {
318 tombstones.push(vec_id as u32);
319 }
320 }
321 }
322 }
323
324 tombstones.sort();
325 Ok(tombstones)
326 }
327
328 pub fn clear_tombstones(&self, segment_id: u64) -> Result<()> {
330 let txn = self
331 .db
332 .begin_transaction()
333 .map_err(|e| Error::Storage(e.to_string()))?;
334
335 let prefix = b"tombstones/";
337 let entries = self
338 .db
339 .scan(txn, prefix)
340 .map_err(|e| Error::Storage(e.to_string()))?;
341
342 self.db
343 .commit(txn)
344 .map_err(|e| Error::Storage(e.to_string()))?;
345
346 let mut keys_to_delete = Vec::new();
348 for (key, value) in &entries {
349 if let Ok(json) = serde_json::from_slice::<serde_json::Value>(value) {
350 if json["segment_id"].as_u64() == Some(segment_id) {
351 keys_to_delete.push(key.clone());
352 }
353 }
354 }
355
356 if !keys_to_delete.is_empty() {
358 let txn = self
359 .db
360 .begin_transaction()
361 .map_err(|e| Error::Storage(e.to_string()))?;
362
363 for key in &keys_to_delete {
364 self.db
365 .delete(txn, key)
366 .map_err(|e| Error::Storage(e.to_string()))?;
367 }
368
369 self.db
370 .commit(txn)
371 .map_err(|e| Error::Storage(e.to_string()))?;
372
373 tracing::info!(
374 "clear_tombstones: removed {} tombstones for segment {}",
375 keys_to_delete.len(),
376 segment_id
377 );
378 }
379
380 Ok(())
381 }
382
383 pub fn get_vector_count(&self, collection_id: i64) -> Result<u64> {
385 let segments = self.get_segments(collection_id)?;
386 let total: u64 = segments.iter().map(|s| s.n_vec as u64).sum();
387 Ok(total)
388 }
389
390 pub fn begin_transaction(&self) -> Result<()> {
392 Ok(())
394 }
395
396 pub fn commit(&self) -> Result<()> {
398 Ok(())
400 }
401
402 pub fn rollback(&self) -> Result<()> {
404 Ok(())
406 }
407
408 pub fn checkpoint(&self) -> Result<()> {
410 Ok(())
412 }
413}
414
415#[derive(Debug, Clone, Serialize, Deserialize)]
417pub struct CollectionInfo {
418 pub id: i64,
419 pub name: String,
420 pub dim: u32,
421 pub metric: String,
422 pub config_json: String,
423}
424
425impl CollectionInfo {
426 pub fn config(&self) -> Result<EngineConfig> {
428 serde_json::from_str(&self.config_json).map_err(|e| Error::Serialization(e.to_string()))
429 }
430}
431
432#[derive(Debug, Clone)]
434pub struct SegmentInfo {
435 pub id: u64,
436 pub path: String,
437 pub state: SegmentState,
438 pub n_vec: u32,
439 pub min_vec_id: Option<u32>,
440 pub max_vec_id: Option<u32>,
441}
442
443impl SegmentState {
444 fn to_string(&self) -> &'static str {
445 match self {
446 SegmentState::Mutable => "mutable",
447 SegmentState::Sealing => "sealing",
448 SegmentState::Sealed => "sealed",
449 SegmentState::Compacting => "compacting",
450 SegmentState::Deleted => "deleted",
451 }
452 }
453
454 fn from_str(s: &str) -> Self {
455 match s {
456 "mutable" => SegmentState::Mutable,
457 "sealing" => SegmentState::Sealing,
458 "sealed" => SegmentState::Sealed,
459 "compacting" => SegmentState::Compacting,
460 "deleted" => SegmentState::Deleted,
461 _ => SegmentState::Sealed,
462 }
463 }
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469
470 #[test]
471 fn test_catalog_operations() {
472 let catalog = Catalog::open_memory().unwrap();
473
474 let config = EngineConfig::with_dim(768);
476 let collection_id = catalog.create_collection("test", &config).unwrap();
477 assert!(collection_id > 0);
478
479 let info = catalog.get_collection("test").unwrap();
481 assert_eq!(info.dim, 768);
482
483 let segment = SegmentInfo {
485 id: 1,
486 path: "/data/segment_1.seg".to_string(),
487 state: SegmentState::Sealed,
488 n_vec: 10000,
489 min_vec_id: Some(0),
490 max_vec_id: Some(9999),
491 };
492 catalog.add_segment(collection_id, &segment).unwrap();
493
494 let segments = catalog.get_segments(collection_id).unwrap();
496 assert_eq!(segments.len(), 1);
497 assert_eq!(segments[0].n_vec, 10000);
498
499 catalog.add_tombstone(collection_id, 1, 500).unwrap();
501 let tombstones = catalog.get_tombstones(1).unwrap();
502 assert_eq!(tombstones, vec![500]);
503 }
504
505 #[test]
506 fn test_collection_not_found() {
507 let catalog = Catalog::open_memory().unwrap();
508 let result = catalog.get_collection("nonexistent");
509 assert!(matches!(result, Err(Error::CollectionNotFound(_))));
510 }
511}