vectradb_storage/
lib.rs

1use ndarray::Array1;
2use serde::{Deserialize, Serialize};
3use sled::{Db, Tree};
4use std::collections::HashMap;
5use std::sync::Arc;
6use vectradb_components::{
7    DatabaseStats, VectorDatabase, VectorDocument, VectorMetadata, VectraDBError,
8};
9use vectradb_search::{
10    AdvancedSearch, HNSWIndex, LSHIndex, PQIndex, SearchAlgorithm, SearchConfig,
11};
12
13/// Persistent vector database with multiple indexing strategies
14pub struct PersistentVectorDB {
15    storage: Arc<Db>,
16    vectors_tree: Tree,
17    metadata_tree: Tree,
18    index: Box<dyn AdvancedSearch + Send + Sync>,
19    config: DatabaseConfig,
20    stats: DatabaseStats,
21}
22
23/// Database configuration
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct DatabaseConfig {
26    pub data_dir: String,
27    pub search_algorithm: SearchAlgorithm,
28    pub index_config: SearchConfig,
29    pub auto_flush: bool,
30    pub cache_size: usize,
31}
32
33impl Default for DatabaseConfig {
34    fn default() -> Self {
35        Self {
36            data_dir: "./vectradb_data".to_string(),
37            search_algorithm: SearchAlgorithm::HNSW,
38            index_config: SearchConfig::default(),
39            auto_flush: true,
40            cache_size: 1000,
41        }
42    }
43}
44
45impl PersistentVectorDB {
46    /// Create a new persistent vector database
47    pub async fn new(config: DatabaseConfig) -> Result<Self, VectraDBError> {
48        let db = sled::open(&config.data_dir)
49            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
50
51        let vectors_tree = db
52            .open_tree("vectors")
53            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
54
55        let metadata_tree = db
56            .open_tree("metadata")
57            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
58
59        // Create search index based on configuration
60        let index: Box<dyn AdvancedSearch + Send + Sync> = match config.search_algorithm {
61            SearchAlgorithm::HNSW => Box::new(HNSWIndex::new(
62                config.index_config.dimension.unwrap_or(384),
63                config.index_config.m,
64                config.index_config.ef_construction,
65            )),
66            SearchAlgorithm::LSH => Box::new(LSHIndex::new(
67                config.index_config.dimension.unwrap_or(384),
68                config.index_config.num_hashes,
69            )),
70            SearchAlgorithm::PQ => Box::new(PQIndex::new(
71                config.index_config.dimension.unwrap_or(384),
72                config.index_config.num_subspaces.unwrap_or(8),
73                config.index_config.codes_per_subspace.unwrap_or(256),
74            )),
75            _ => {
76                return Err(VectraDBError::DatabaseError(anyhow::anyhow!(
77                    "Unsupported search algorithm"
78                )))
79            }
80        };
81
82        let mut db_instance = Self {
83            storage: Arc::new(db),
84            vectors_tree,
85            metadata_tree,
86            index,
87            config,
88            stats: DatabaseStats::default(),
89        };
90
91        // Load existing data and rebuild index
92        db_instance.rebuild_index().await?;
93
94        Ok(db_instance)
95    }
96
97    /// Rebuild the search index from persistent storage
98    async fn rebuild_index(&mut self) -> Result<(), VectraDBError> {
99        let mut documents = Vec::new();
100
101        for result in self.vectors_tree.iter() {
102            let (id_bytes, vector_bytes) =
103                result.map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
104
105            let id = String::from_utf8(id_bytes.to_vec())
106                .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
107
108            // Get metadata
109            let metadata_bytes = self
110                .metadata_tree
111                .get(&id)
112                .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?
113                .ok_or_else(|| VectraDBError::VectorNotFound { id: id.clone() })?;
114
115            let metadata: VectorMetadata = bincode::deserialize(&metadata_bytes)
116                .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
117
118            // Deserialize vector data
119            let data: Array1<f32> = bincode::deserialize(&vector_bytes)
120                .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
121
122            let document = VectorDocument { metadata, data };
123            documents.push(document);
124        }
125
126        // Build index with loaded documents
127        self.index.build_index(documents)?;
128
129        // Update stats
130        self.stats.total_vectors = self.vectors_tree.len();
131
132        Ok(())
133    }
134
135    /// Serialize and store vector data
136    #[allow(dead_code)]
137    async fn store_vector(&self, id: &str, document: &VectorDocument) -> Result<(), VectraDBError> {
138        // Serialize vector data
139        let vector_bytes = bincode::serialize(&document.data)
140            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
141
142        let metadata_bytes = bincode::serialize(&document.metadata)
143            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
144
145        // Store in database
146        self.vectors_tree
147            .insert(id.as_bytes(), vector_bytes)
148            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
149
150        self.metadata_tree
151            .insert(id.as_bytes(), metadata_bytes)
152            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
153
154        // Flush if auto-flush is enabled
155        if self.config.auto_flush {
156            self.storage
157                .flush_async()
158                .await
159                .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
160        }
161
162        Ok(())
163    }
164
165    /// Serialize and store vector data (sync version)
166    fn store_vector_sync(&self, id: &str, document: &VectorDocument) -> Result<(), VectraDBError> {
167        // Serialize vector data
168        let vector_bytes = bincode::serialize(&document.data)
169            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
170
171        let metadata_bytes = bincode::serialize(&document.metadata)
172            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
173
174        // Store in database
175        self.vectors_tree
176            .insert(id.as_bytes(), vector_bytes)
177            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
178
179        self.metadata_tree
180            .insert(id.as_bytes(), metadata_bytes)
181            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
182
183        // Flush if auto-flush is enabled
184        if self.config.auto_flush {
185            self.storage
186                .flush()
187                .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
188        }
189
190        Ok(())
191    }
192
193    /// Load vector from persistent storage
194    #[allow(dead_code)]
195    async fn load_vector(&self, id: &str) -> Result<VectorDocument, VectraDBError> {
196        // Load metadata
197        let metadata_bytes = self
198            .metadata_tree
199            .get(id.as_bytes())
200            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?
201            .ok_or_else(|| VectraDBError::VectorNotFound { id: id.to_string() })?;
202
203        let metadata: VectorMetadata = bincode::deserialize(&metadata_bytes)
204            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
205
206        // Load vector data
207        let vector_bytes = self
208            .vectors_tree
209            .get(id.as_bytes())
210            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?
211            .ok_or_else(|| VectraDBError::VectorNotFound { id: id.to_string() })?;
212
213        let data: Array1<f32> = bincode::deserialize(&vector_bytes)
214            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
215
216        Ok(VectorDocument { metadata, data })
217    }
218
219    /// Load vector from persistent storage (sync version)
220    fn load_vector_sync(&self, id: &str) -> Result<VectorDocument, VectraDBError> {
221        // Load metadata
222        let metadata_bytes = self
223            .metadata_tree
224            .get(id.as_bytes())
225            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?
226            .ok_or_else(|| VectraDBError::VectorNotFound { id: id.to_string() })?;
227
228        let metadata: VectorMetadata = bincode::deserialize(&metadata_bytes)
229            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
230
231        // Load vector data
232        let vector_bytes = self
233            .vectors_tree
234            .get(id.as_bytes())
235            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?
236            .ok_or_else(|| VectraDBError::VectorNotFound { id: id.to_string() })?;
237
238        let data: Array1<f32> = bincode::deserialize(&vector_bytes)
239            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
240
241        Ok(VectorDocument { metadata, data })
242    }
243
244    /// Remove vector from persistent storage
245    #[allow(dead_code)]
246    async fn remove_stored_vector(&self, id: &str) -> Result<(), VectraDBError> {
247        self.vectors_tree
248            .remove(id.as_bytes())
249            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
250
251        self.metadata_tree
252            .remove(id.as_bytes())
253            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
254
255        if self.config.auto_flush {
256            self.storage
257                .flush_async()
258                .await
259                .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
260        }
261
262        Ok(())
263    }
264
265    /// Remove vector from persistent storage (sync version)
266    fn remove_stored_vector_sync(&self, id: &str) -> Result<(), VectraDBError> {
267        self.vectors_tree
268            .remove(id.as_bytes())
269            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
270
271        self.metadata_tree
272            .remove(id.as_bytes())
273            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
274
275        if self.config.auto_flush {
276            self.storage
277                .flush()
278                .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
279        }
280
281        Ok(())
282    }
283}
284
285impl VectorDatabase for PersistentVectorDB {
286    fn create_vector(
287        &mut self,
288        id: String,
289        vector: Array1<f32>,
290        tags: Option<HashMap<String, String>>,
291    ) -> Result<(), VectraDBError> {
292        let document = vectradb_components::vector_operations::create_vector_document(
293            id.clone(),
294            vector,
295            tags,
296        )?;
297
298        // Store in index
299        self.index.insert(document.clone())?;
300
301        // Store in persistent storage (sync operation)
302        self.store_vector_sync(&id, &document)?;
303
304        self.stats.total_vectors += 1;
305        Ok(())
306    }
307
308    fn get_vector(&self, id: &str) -> Result<VectorDocument, VectraDBError> {
309        self.load_vector_sync(id)
310    }
311
312    fn update_vector(
313        &mut self,
314        id: &str,
315        vector: Array1<f32>,
316        tags: Option<HashMap<String, String>>,
317    ) -> Result<(), VectraDBError> {
318        // Load existing document
319        let existing_doc = self.load_vector_sync(id)?;
320
321        // Update document
322        let updated_doc = vectradb_components::vector_operations::update_vector_document(
323            existing_doc,
324            vector,
325            tags,
326        )?;
327
328        // Update in index
329        self.index.update(id, updated_doc.clone())?;
330
331        // Update in persistent storage
332        self.store_vector_sync(id, &updated_doc)?;
333
334        Ok(())
335    }
336
337    fn delete_vector(&mut self, id: &str) -> Result<(), VectraDBError> {
338        // Remove from index
339        self.index.remove(id)?;
340
341        // Remove from persistent storage
342        self.remove_stored_vector_sync(id)?;
343
344        self.stats.total_vectors -= 1;
345        Ok(())
346    }
347
348    fn upsert_vector(
349        &mut self,
350        id: String,
351        vector: Array1<f32>,
352        tags: Option<HashMap<String, String>>,
353    ) -> Result<(), VectraDBError> {
354        if self
355            .vectors_tree
356            .contains_key(id.as_bytes())
357            .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?
358        {
359            self.update_vector(&id, vector, tags)
360        } else {
361            self.create_vector(id, vector, tags)
362        }
363    }
364
365    fn search_similar(
366        &self,
367        query_vector: Array1<f32>,
368        top_k: usize,
369    ) -> Result<Vec<vectradb_components::SimilarityResult>, VectraDBError> {
370        let search_results = self.index.search(&query_vector, top_k)?;
371
372        let similarity_results: Vec<vectradb_components::SimilarityResult> = search_results
373            .into_iter()
374            .map(|result| {
375                let id = result.id.clone();
376                vectradb_components::SimilarityResult {
377                    id: result.id,
378                    score: result.similarity,
379                    metadata: vectradb_components::VectorMetadata {
380                        id,
381                        dimension: 0, // Will be filled from actual document
382                        created_at: 0,
383                        updated_at: 0,
384                        tags: HashMap::new(),
385                    },
386                }
387            })
388            .collect();
389
390        Ok(similarity_results)
391    }
392
393    fn list_vectors(&self) -> Result<Vec<String>, VectraDBError> {
394        let mut ids = Vec::new();
395
396        for result in self.vectors_tree.iter() {
397            let (id_bytes, _) =
398                result.map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
399
400            let id = String::from_utf8(id_bytes.to_vec())
401                .map_err(|e| VectraDBError::DatabaseError(anyhow::anyhow!(e)))?;
402
403            ids.push(id);
404        }
405
406        Ok(ids)
407    }
408
409    fn get_stats(&self) -> Result<DatabaseStats, VectraDBError> {
410        let index_stats = self.index.get_stats();
411
412        Ok(DatabaseStats {
413            total_vectors: self.stats.total_vectors,
414            dimension: self.config.index_config.dimension.unwrap_or(384),
415            memory_usage: index_stats.index_size_bytes as u64,
416        })
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use super::*;
423    use tempfile::tempdir;
424
425    #[tokio::test]
426    async fn test_persistent_db_creation() {
427        let temp_dir = tempdir().unwrap();
428        let config = DatabaseConfig {
429            data_dir: temp_dir.path().to_string_lossy().to_string(),
430            ..Default::default()
431        };
432
433        let db = PersistentVectorDB::new(config).await;
434        assert!(db.is_ok());
435    }
436
437    #[tokio::test]
438    async fn test_persistent_db_operations() {
439        let temp_dir = tempdir().unwrap();
440
441        // Create config with matching dimension
442        let search_config = SearchConfig {
443            dimension: Some(3),
444            ..Default::default()
445        };
446
447        let config = DatabaseConfig {
448            data_dir: temp_dir.path().to_string_lossy().to_string(),
449            index_config: search_config,
450            ..Default::default()
451        };
452
453        let mut db = PersistentVectorDB::new(config).await.unwrap();
454
455        let vector = Array1::from_vec(vec![1.0, 2.0, 3.0]);
456        assert!(db
457            .create_vector("test_id".to_string(), vector, None)
458            .is_ok());
459        assert!(db.get_vector("test_id").is_ok());
460        assert!(db.delete_vector("test_id").is_ok());
461    }
462}