Skip to main content

sochdb_vector/catalog/
sochdb_catalog.rs

1//! SochDB-based catalog implementation.
2//!
3//! Uses SochDB's durable storage for unified transaction semantics.
4
5use std::path::Path;
6use std::sync::Arc;
7
8use serde::{Deserialize, Serialize};
9
10use sochdb_storage::database::{Database, DatabaseConfig};
11
12use crate::config::EngineConfig;
13use crate::error::{Error, Result};
14use crate::types::*;
15
16/// Catalog for managing segment metadata using SochDB storage
17pub struct Catalog {
18    db: Arc<Database>,
19}
20
21impl Catalog {
22    /// Open or create a catalog database
23    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
24        let config = DatabaseConfig {
25            group_commit: true,
26            ..Default::default()
27        };
28        let db =
29            Database::open_with_config(path, config).map_err(|e| Error::Storage(e.to_string()))?;
30
31        Ok(Self { db })
32    }
33
34    /// Open an in-memory catalog (for testing)
35    pub fn open_memory() -> Result<Self> {
36        let temp_dir = tempfile::tempdir().map_err(|e| Error::Storage(e.to_string()))?;
37        Self::open(temp_dir.path())
38    }
39
40    /// Get current timestamp in seconds
41    fn now_secs() -> i64 {
42        use std::time::{SystemTime, UNIX_EPOCH};
43        SystemTime::now()
44            .duration_since(UNIX_EPOCH)
45            .unwrap()
46            .as_secs() as i64
47    }
48
49    /// Create a new collection
50    pub fn create_collection(&self, name: &str, config: &EngineConfig) -> Result<i64> {
51        let config_json =
52            serde_json::to_string(config).map_err(|e| Error::Serialization(e.to_string()))?;
53
54        let metric = match config.metric {
55            Metric::DotProduct => "dot_product",
56            Metric::Cosine => "cosine",
57        };
58
59        let txn = self
60            .db
61            .begin_transaction()
62            .map_err(|e| Error::Storage(e.to_string()))?;
63
64        // Use timestamp as ID for simplicity
65        let id = Self::now_secs();
66
67        let key = format!("collections/{}", name);
68        let value = serde_json::json!({
69            "id": id,
70            "name": name,
71            "dim": config.dim,
72            "metric": metric,
73            "config_json": config_json,
74            "created_at": Self::now_secs()
75        });
76
77        self.db
78            .put(txn, key.as_bytes(), value.to_string().as_bytes())
79            .map_err(|e| Error::Storage(e.to_string()))?;
80
81        self.db
82            .commit(txn)
83            .map_err(|e| Error::Storage(e.to_string()))?;
84
85        Ok(id)
86    }
87
88    /// Get collection by name
89    pub fn get_collection(&self, name: &str) -> Result<CollectionInfo> {
90        let key = format!("collections/{}", name);
91
92        let txn = self
93            .db
94            .begin_transaction()
95            .map_err(|e| Error::Storage(e.to_string()))?;
96        let value = self
97            .db
98            .get(txn, key.as_bytes())
99            .map_err(|e| Error::Storage(e.to_string()))?
100            .ok_or_else(|| Error::CollectionNotFound(name.to_string()))?;
101        self.db
102            .commit(txn)
103            .map_err(|e| Error::Storage(e.to_string()))?;
104
105        let json: serde_json::Value =
106            serde_json::from_slice(&value).map_err(|e| Error::Serialization(e.to_string()))?;
107
108        Ok(CollectionInfo {
109            id: json["id"].as_i64().unwrap_or(0),
110            name: json["name"].as_str().unwrap_or("").to_string(),
111            dim: json["dim"].as_u64().unwrap_or(0) as u32,
112            metric: json["metric"].as_str().unwrap_or("dot_product").to_string(),
113            config_json: json["config_json"].as_str().unwrap_or("{}").to_string(),
114        })
115    }
116
117    /// List all collections
118    pub fn list_collections(&self) -> Result<Vec<CollectionInfo>> {
119        let txn = self
120            .db
121            .begin_transaction()
122            .map_err(|e| Error::Storage(e.to_string()))?;
123
124        let prefix = b"collections/";
125        let entries = self
126            .db
127            .scan(txn, prefix)
128            .map_err(|e| Error::Storage(e.to_string()))?;
129
130        self.db
131            .commit(txn)
132            .map_err(|e| Error::Storage(e.to_string()))?;
133
134        let mut collections = Vec::new();
135        for (_key, value) in entries {
136            if let Ok(json) = serde_json::from_slice::<serde_json::Value>(&value) {
137                collections.push(CollectionInfo {
138                    id: json["id"].as_i64().unwrap_or(0),
139                    name: json["name"].as_str().unwrap_or("").to_string(),
140                    dim: json["dim"].as_u64().unwrap_or(0) as u32,
141                    metric: json["metric"].as_str().unwrap_or("dot_product").to_string(),
142                    config_json: json["config_json"].as_str().unwrap_or("{}").to_string(),
143                });
144            }
145        }
146
147        Ok(collections)
148    }
149
150    /// Register a new segment
151    pub fn add_segment(&self, collection_id: i64, segment: &SegmentInfo) -> Result<()> {
152        let txn = self
153            .db
154            .begin_transaction()
155            .map_err(|e| Error::Storage(e.to_string()))?;
156
157        let key = format!("segments/{}/{}", collection_id, segment.id);
158        let value = serde_json::json!({
159            "id": segment.id,
160            "collection_id": collection_id,
161            "path": segment.path,
162            "state": segment.state.to_string(),
163            "n_vec": segment.n_vec,
164            "min_vec_id": segment.min_vec_id,
165            "max_vec_id": segment.max_vec_id,
166            "created_at": Self::now_secs()
167        });
168
169        self.db
170            .put(txn, key.as_bytes(), value.to_string().as_bytes())
171            .map_err(|e| Error::Storage(e.to_string()))?;
172
173        self.db
174            .commit(txn)
175            .map_err(|e| Error::Storage(e.to_string()))?;
176
177        Ok(())
178    }
179
180    /// Get all active segments for a collection
181    pub fn get_segments(&self, collection_id: i64) -> Result<Vec<SegmentInfo>> {
182        let txn = self
183            .db
184            .begin_transaction()
185            .map_err(|e| Error::Storage(e.to_string()))?;
186
187        let prefix = format!("segments/{}/", collection_id);
188        let entries = self
189            .db
190            .scan(txn, prefix.as_bytes())
191            .map_err(|e| Error::Storage(e.to_string()))?;
192
193        self.db
194            .commit(txn)
195            .map_err(|e| Error::Storage(e.to_string()))?;
196
197        let mut segments = Vec::new();
198        for (_key, value) in entries {
199            if let Ok(json) = serde_json::from_slice::<serde_json::Value>(&value) {
200                let state_str = json["state"].as_str().unwrap_or("sealed");
201                if state_str != "deleted" {
202                    segments.push(SegmentInfo {
203                        id: json["id"].as_u64().unwrap_or(0),
204                        path: json["path"].as_str().unwrap_or("").to_string(),
205                        state: SegmentState::from_str(state_str),
206                        n_vec: json["n_vec"].as_u64().unwrap_or(0) as u32,
207                        min_vec_id: json["min_vec_id"].as_u64().map(|v| v as u32),
208                        max_vec_id: json["max_vec_id"].as_u64().map(|v| v as u32),
209                    });
210                }
211            }
212        }
213
214        // Sort by ID descending (newest first)
215        segments.sort_by(|a, b| b.id.cmp(&a.id));
216        Ok(segments)
217    }
218
219    /// Update segment state
220    pub fn update_segment_state(&self, segment_id: u64, state: SegmentState) -> Result<()> {
221        // Scan all collection segment prefixes to find the segment by ID.
222        // Key format: segments/{collection_id}/{segment_id}
223        let txn = self
224            .db
225            .begin_transaction()
226            .map_err(|e| Error::Storage(e.to_string()))?;
227
228        let prefix = b"segments/";
229        let entries = self
230            .db
231            .scan(txn, prefix)
232            .map_err(|e| Error::Storage(e.to_string()))?;
233
234        // Find the entry matching segment_id and update its state
235        let mut found = false;
236        for (key, value) in &entries {
237            if let Ok(mut json) = serde_json::from_slice::<serde_json::Value>(value) {
238                if json["id"].as_u64() == Some(segment_id) {
239                    json["state"] = serde_json::Value::String(state.to_string().to_owned());
240
241                    let txn2 = self
242                        .db
243                        .begin_transaction()
244                        .map_err(|e| Error::Storage(e.to_string()))?;
245                    self.db
246                        .put(txn2, key, json.to_string().as_bytes())
247                        .map_err(|e| Error::Storage(e.to_string()))?;
248                    self.db
249                        .commit(txn2)
250                        .map_err(|e| Error::Storage(e.to_string()))?;
251
252                    found = true;
253                    break;
254                }
255            }
256        }
257
258        self.db
259            .commit(txn)
260            .map_err(|e| Error::Storage(e.to_string()))?;
261
262        if !found {
263            tracing::warn!("update_segment_state: segment {} not found", segment_id);
264        }
265
266        Ok(())
267    }
268
269    /// Add a tombstone
270    pub fn add_tombstone(&self, collection_id: i64, segment_id: u64, vec_id: u32) -> Result<()> {
271        let txn = self
272            .db
273            .begin_transaction()
274            .map_err(|e| Error::Storage(e.to_string()))?;
275
276        let key = format!("tombstones/{}/{}/{}", collection_id, segment_id, vec_id);
277        let value = serde_json::json!({
278            "collection_id": collection_id,
279            "segment_id": segment_id,
280            "vec_id": vec_id,
281            "created_at": Self::now_secs()
282        });
283
284        self.db
285            .put(txn, key.as_bytes(), value.to_string().as_bytes())
286            .map_err(|e| Error::Storage(e.to_string()))?;
287
288        self.db
289            .commit(txn)
290            .map_err(|e| Error::Storage(e.to_string()))?;
291
292        Ok(())
293    }
294
295    /// Get tombstones for a segment
296    pub fn get_tombstones(&self, segment_id: u64) -> Result<Vec<u32>> {
297        let txn = self
298            .db
299            .begin_transaction()
300            .map_err(|e| Error::Storage(e.to_string()))?;
301
302        // Scan all tombstones and filter by segment_id
303        let prefix = b"tombstones/";
304        let entries = self
305            .db
306            .scan(txn, prefix)
307            .map_err(|e| Error::Storage(e.to_string()))?;
308
309        self.db
310            .commit(txn)
311            .map_err(|e| Error::Storage(e.to_string()))?;
312
313        let mut tombstones = Vec::new();
314        for (_key, value) in entries {
315            if let Ok(json) = serde_json::from_slice::<serde_json::Value>(&value) {
316                if json["segment_id"].as_u64() == Some(segment_id) {
317                    if let Some(vec_id) = json["vec_id"].as_u64() {
318                        tombstones.push(vec_id as u32);
319                    }
320                }
321            }
322        }
323
324        tombstones.sort();
325        Ok(tombstones)
326    }
327
328    /// Delete tombstones for a segment (after compaction)
329    pub fn clear_tombstones(&self, segment_id: u64) -> Result<()> {
330        let txn = self
331            .db
332            .begin_transaction()
333            .map_err(|e| Error::Storage(e.to_string()))?;
334
335        // Scan all tombstone entries: tombstones/{collection_id}/{segment_id}/{vec_id}
336        let prefix = b"tombstones/";
337        let entries = self
338            .db
339            .scan(txn, prefix)
340            .map_err(|e| Error::Storage(e.to_string()))?;
341
342        self.db
343            .commit(txn)
344            .map_err(|e| Error::Storage(e.to_string()))?;
345
346        // Collect keys matching this segment_id
347        let mut keys_to_delete = Vec::new();
348        for (key, value) in &entries {
349            if let Ok(json) = serde_json::from_slice::<serde_json::Value>(value) {
350                if json["segment_id"].as_u64() == Some(segment_id) {
351                    keys_to_delete.push(key.clone());
352                }
353            }
354        }
355
356        // Delete all matching tombstones in a single transaction
357        if !keys_to_delete.is_empty() {
358            let txn = self
359                .db
360                .begin_transaction()
361                .map_err(|e| Error::Storage(e.to_string()))?;
362
363            for key in &keys_to_delete {
364                self.db
365                    .delete(txn, key)
366                    .map_err(|e| Error::Storage(e.to_string()))?;
367            }
368
369            self.db
370                .commit(txn)
371                .map_err(|e| Error::Storage(e.to_string()))?;
372
373            tracing::info!(
374                "clear_tombstones: removed {} tombstones for segment {}",
375                keys_to_delete.len(),
376                segment_id
377            );
378        }
379
380        Ok(())
381    }
382
383    /// Get total vector count for collection
384    pub fn get_vector_count(&self, collection_id: i64) -> Result<u64> {
385        let segments = self.get_segments(collection_id)?;
386        let total: u64 = segments.iter().map(|s| s.n_vec as u64).sum();
387        Ok(total)
388    }
389
390    /// Begin a transaction
391    pub fn begin_transaction(&self) -> Result<()> {
392        // SochDB handles transactions internally
393        Ok(())
394    }
395
396    /// Commit transaction
397    pub fn commit(&self) -> Result<()> {
398        // SochDB handles transactions internally
399        Ok(())
400    }
401
402    /// Rollback transaction
403    pub fn rollback(&self) -> Result<()> {
404        // SochDB handles transactions internally
405        Ok(())
406    }
407
408    /// Execute checkpoint
409    pub fn checkpoint(&self) -> Result<()> {
410        // SochDB handles checkpointing internally
411        Ok(())
412    }
413}
414
415/// Collection info from catalog
416#[derive(Debug, Clone, Serialize, Deserialize)]
417pub struct CollectionInfo {
418    pub id: i64,
419    pub name: String,
420    pub dim: u32,
421    pub metric: String,
422    pub config_json: String,
423}
424
425impl CollectionInfo {
426    /// Parse config from JSON
427    pub fn config(&self) -> Result<EngineConfig> {
428        serde_json::from_str(&self.config_json).map_err(|e| Error::Serialization(e.to_string()))
429    }
430}
431
432/// Segment info from catalog
433#[derive(Debug, Clone)]
434pub struct SegmentInfo {
435    pub id: u64,
436    pub path: String,
437    pub state: SegmentState,
438    pub n_vec: u32,
439    pub min_vec_id: Option<u32>,
440    pub max_vec_id: Option<u32>,
441}
442
443impl SegmentState {
444    fn to_string(&self) -> &'static str {
445        match self {
446            SegmentState::Mutable => "mutable",
447            SegmentState::Sealing => "sealing",
448            SegmentState::Sealed => "sealed",
449            SegmentState::Compacting => "compacting",
450            SegmentState::Deleted => "deleted",
451        }
452    }
453
454    fn from_str(s: &str) -> Self {
455        match s {
456            "mutable" => SegmentState::Mutable,
457            "sealing" => SegmentState::Sealing,
458            "sealed" => SegmentState::Sealed,
459            "compacting" => SegmentState::Compacting,
460            "deleted" => SegmentState::Deleted,
461            _ => SegmentState::Sealed,
462        }
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469
470    #[test]
471    fn test_catalog_operations() {
472        let catalog = Catalog::open_memory().unwrap();
473
474        // Create collection
475        let config = EngineConfig::with_dim(768);
476        let collection_id = catalog.create_collection("test", &config).unwrap();
477        assert!(collection_id > 0);
478
479        // Get collection
480        let info = catalog.get_collection("test").unwrap();
481        assert_eq!(info.dim, 768);
482
483        // Add segment
484        let segment = SegmentInfo {
485            id: 1,
486            path: "/data/segment_1.seg".to_string(),
487            state: SegmentState::Sealed,
488            n_vec: 10000,
489            min_vec_id: Some(0),
490            max_vec_id: Some(9999),
491        };
492        catalog.add_segment(collection_id, &segment).unwrap();
493
494        // Get segments
495        let segments = catalog.get_segments(collection_id).unwrap();
496        assert_eq!(segments.len(), 1);
497        assert_eq!(segments[0].n_vec, 10000);
498
499        // Add tombstone
500        catalog.add_tombstone(collection_id, 1, 500).unwrap();
501        let tombstones = catalog.get_tombstones(1).unwrap();
502        assert_eq!(tombstones, vec![500]);
503    }
504
505    #[test]
506    fn test_collection_not_found() {
507        let catalog = Catalog::open_memory().unwrap();
508        let result = catalog.get_collection("nonexistent");
509        assert!(matches!(result, Err(Error::CollectionNotFound(_))));
510    }
511}