Skip to main content

vectrust_core/
index.rs

1use crate::*;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use tokio::sync::RwLock;
5use uuid::Uuid;
6use chrono::Utc;
7use serde::{Serialize, Deserialize};
8
9// We'll define a trait here to avoid cyclic dependencies
10use async_trait::async_trait;
11
12#[async_trait]
13pub trait StorageBackend: Send + Sync {
14    async fn exists(&self) -> bool;
15    async fn create_index(&mut self, config: &CreateIndexConfig) -> Result<()>;
16    async fn get_item(&self, id: &uuid::Uuid) -> Result<Option<VectorItem>>;
17    async fn insert_item(&mut self, item: &VectorItem) -> Result<()>;
18    async fn insert_items(&mut self, items: &[VectorItem]) -> Result<()> {
19        // Default implementation - can be overridden for better performance
20        for item in items {
21            self.insert_item(item).await?;
22        }
23        Ok(())
24    }
25    async fn update_item(&mut self, item: &VectorItem) -> Result<()>;
26    async fn delete_item(&mut self, id: &uuid::Uuid) -> Result<()>;
27    async fn list_items(&self, options: Option<ListOptions>) -> Result<Vec<VectorItem>>;
28    async fn query_items(&self, query: &Query) -> Result<Vec<QueryResult>>;
29    async fn begin_transaction(&mut self) -> Result<()>;
30    async fn commit_transaction(&mut self) -> Result<()>;
31    async fn rollback_transaction(&mut self) -> Result<()>;
32    async fn delete_index(&mut self) -> Result<()>;
33    async fn get_stats(&self) -> Result<IndexStats>;
34}
35
36/// Configuration matching Node.js CreateIndexConfig
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct CreateIndexConfig {
39    #[serde(default = "default_version")]
40    pub version: u32,
41    
42    #[serde(default)]
43    pub delete_if_exists: bool,
44    
45    #[serde(default = "default_distance_metric")]
46    pub distance_metric: DistanceMetric,
47    
48    #[serde(default)]
49    pub metadata_config: MetadataConfig,
50    
51    #[serde(default)]
52    pub hnsw_config: HnswConfig,
53}
54
55fn default_version() -> u32 { 1 }
56fn default_distance_metric() -> DistanceMetric { DistanceMetric::Cosine }
57
58impl Default for CreateIndexConfig {
59    fn default() -> Self {
60        Self {
61            version: default_version(),
62            delete_if_exists: false,
63            distance_metric: default_distance_metric(),
64            metadata_config: MetadataConfig::default(),
65            hnsw_config: HnswConfig::default(),
66        }
67    }
68}
69
70/// Main LocalIndex implementation with full Node.js API compatibility
71pub struct LocalIndex {
72    #[allow(dead_code)]
73    path: PathBuf,
74    #[allow(dead_code)]
75    index_name: String,
76    storage: Arc<RwLock<Box<dyn StorageBackend>>>,
77    config: Arc<RwLock<Option<CreateIndexConfig>>>,
78}
79
80impl LocalIndex {
81    /// Creates a new LocalIndex instance with a provided storage backend
82    pub fn with_storage(
83        folder_path: impl AsRef<Path>, 
84        index_name: Option<String>,
85        storage: Box<dyn StorageBackend>
86    ) -> Result<Self> {
87        let path = folder_path.as_ref().to_path_buf();
88        let index_name = index_name.unwrap_or_else(|| "index.json".to_string());
89        
90        Ok(Self {
91            path,
92            index_name,
93            storage: Arc::new(RwLock::new(storage)),
94            config: Arc::new(RwLock::new(None)),
95        })
96    }
97    
98    /// Creates a new LocalIndex instance (legacy API, requires storage dependency)
99    /// This will be deprecated - use with_storage instead
100    pub fn new<P: AsRef<Path>>(folder_path: P, index_name: Option<String>) -> Result<Self> {
101        let path = folder_path.as_ref().to_path_buf();
102        let index_name = index_name.unwrap_or_else(|| "index.json".to_string());
103        
104        // Use dummy storage for backward compatibility
105        // Real implementations should use with_storage() method
106        let storage = create_dummy_storage();
107        
108        Ok(Self {
109            path,
110            index_name,
111            storage: Arc::new(RwLock::new(storage)),
112            config: Arc::new(RwLock::new(None)),
113        })
114    }
115    
116    /// Creates an index with optional configuration
117    pub async fn create_index(&self, config: Option<CreateIndexConfig>) -> Result<()> {
118        let config = config.unwrap_or_default();
119        
120        // Handle delete_if_exists
121        if config.delete_if_exists && self.is_index_created().await {
122            self.delete_index().await?;
123        }
124        
125        // Create index through storage backend
126        let mut storage = self.storage.write().await;
127        storage.create_index(&config).await?;
128        
129        // Store config
130        *self.config.write().await = Some(config);
131        
132        Ok(())
133    }
134    
135    /// Checks if index exists
136    pub async fn is_index_created(&self) -> bool {
137        let storage = self.storage.read().await;
138        storage.exists().await
139    }
140    
141    /// Inserts a new item
142    pub async fn insert_item(&self, item: impl Into<VectorItem>) -> Result<VectorItem> {
143        let mut item = item.into();
144        
145        // Ensure ID
146        if item.id == Uuid::default() {
147            item.id = Uuid::new_v4();
148        }
149        
150        // Update timestamps
151        let now = Utc::now();
152        item.created_at = now;
153        item.updated_at = now;
154        
155        // Insert through storage
156        let mut storage = self.storage.write().await;
157        storage.insert_item(&item).await?;
158        
159        Ok(item)
160    }
161    
162    /// Updates an existing item (partial update)
163    pub async fn update_item(&self, update: UpdateRequest) -> Result<UpdateResult> {
164        let mut storage = self.storage.write().await;
165        
166        // Get existing item
167        let mut item = storage.get_item(&update.id).await?
168            .ok_or(VectraError::ItemNotFound)?;
169        
170        // Apply updates
171        if let Some(vector) = update.vector {
172            item.vector = vector;
173        }
174        
175        if let Some(metadata) = update.metadata {
176            merge_json(&mut item.metadata, metadata);
177        }
178        
179        // Update version and timestamp
180        item.version += 1;
181        item.updated_at = Utc::now();
182        
183        // Save
184        storage.update_item(&item).await?;
185        
186        Ok(UpdateResult {
187            id: item.id,
188            version: item.version,
189        })
190    }
191    
192    /// Upserts an item (insert or update)
193    pub async fn upsert_item(&self, item: impl Into<VectorItem>) -> Result<VectorItem> {
194        let item = item.into();
195        let mut storage = self.storage.write().await;
196        
197        if storage.get_item(&item.id).await?.is_some() {
198            // Update existing
199            storage.update_item(&item).await?;
200        } else {
201            // Insert new
202            storage.insert_item(&item).await?;
203        }
204        
205        Ok(item)
206    }
207    
208    /// Gets an item by ID
209    pub async fn get_item(&self, id: &Uuid) -> Result<Option<VectorItem>> {
210        let storage = self.storage.read().await;
211        storage.get_item(id).await
212    }
213    
214    /// Deletes an item
215    pub async fn delete_item(&self, id: &Uuid) -> Result<()> {
216        let mut storage = self.storage.write().await;
217        storage.delete_item(id).await
218    }
219    
220    /// Lists all items with optional pagination
221    pub async fn list_items(&self, options: Option<ListOptions>) -> Result<Vec<VectorItem>> {
222        let storage = self.storage.read().await;
223        storage.list_items(options).await
224    }
225    
226    /// Query items - maintains Node.js signature compatibility
227    pub async fn query_items(
228        &self,
229        vector: Vec<f32>,
230        top_k: Option<u32>,
231        filter: Option<serde_json::Value>,
232    ) -> Result<Vec<QueryResult>> {
233        self.query_items_extended(vector, None, top_k, filter).await
234    }
235    
236    /// Extended query with text search
237    pub async fn query_items_extended(
238        &self,
239        vector: Vec<f32>,
240        text_query: Option<String>,
241        top_k: Option<u32>,
242        filter: Option<serde_json::Value>,
243    ) -> Result<Vec<QueryResult>> {
244        let storage = self.storage.read().await;
245        
246        let query = Query {
247            vector: Some(vector),
248            text: text_query,
249            top_k: top_k.unwrap_or(10) as usize,
250            filter,
251        };
252        
253        storage.query_items(&query).await
254    }
255    
256    /// Begins an update transaction
257    pub async fn begin_update(&self) -> Result<()> {
258        let mut storage = self.storage.write().await;
259        storage.begin_transaction().await
260    }
261    
262    /// Ends an update transaction
263    pub async fn end_update(&self) -> Result<()> {
264        let mut storage = self.storage.write().await;
265        storage.commit_transaction().await
266    }
267    
268    /// Cancels an update transaction (now async for safety)
269    pub async fn cancel_update(&self) -> Result<()> {
270        let mut storage = self.storage.write().await;
271        storage.rollback_transaction().await
272    }
273    
274    /// Deletes the entire index
275    pub async fn delete_index(&self) -> Result<()> {
276        let mut storage = self.storage.write().await;
277        storage.delete_index().await
278    }
279    
280    /// Gets index statistics
281    pub async fn get_stats(&self) -> Result<IndexStats> {
282        let storage = self.storage.read().await;
283        storage.get_stats().await
284    }
285}
286
287/// Helper function to merge JSON objects (for metadata updates)
288fn merge_json(target: &mut serde_json::Value, source: serde_json::Value) {
289    if let (Some(target_obj), Some(source_obj)) = (target.as_object_mut(), source.as_object()) {
290        for (key, value) in source_obj {
291            target_obj.insert(key.clone(), value.clone());
292        }
293    } else {
294        *target = source;
295    }
296}
297
298// Dummy storage implementation for backward compatibility
299struct DummyStorage;
300
301#[async_trait]
302impl StorageBackend for DummyStorage {
303    async fn exists(&self) -> bool { false }
304    async fn create_index(&mut self, _config: &CreateIndexConfig) -> Result<()> { Ok(()) }
305    async fn get_item(&self, _id: &uuid::Uuid) -> Result<Option<VectorItem>> { Ok(None) }
306    async fn insert_item(&mut self, _item: &VectorItem) -> Result<()> { Ok(()) }
307    async fn update_item(&mut self, _item: &VectorItem) -> Result<()> { Ok(()) }
308    async fn delete_item(&mut self, _id: &uuid::Uuid) -> Result<()> { Ok(()) }
309    async fn list_items(&self, _options: Option<ListOptions>) -> Result<Vec<VectorItem>> { Ok(Vec::new()) }
310    async fn query_items(&self, _query: &Query) -> Result<Vec<QueryResult>> { Ok(Vec::new()) }
311    async fn begin_transaction(&mut self) -> Result<()> { Ok(()) }
312    async fn commit_transaction(&mut self) -> Result<()> { Ok(()) }
313    async fn rollback_transaction(&mut self) -> Result<()> { Ok(()) }
314    async fn delete_index(&mut self) -> Result<()> { Ok(()) }
315    async fn get_stats(&self) -> Result<IndexStats> { 
316        Ok(IndexStats {
317            items: 0,
318            size: 0,
319            dimensions: None,
320            distance_metric: DistanceMetric::Cosine,
321        })
322    }
323}
324
325fn create_dummy_storage() -> Box<dyn StorageBackend> {
326    Box::new(DummyStorage)
327}
328