lumosai_vector_memory/
storage.rs

1//! Memory vector storage implementation
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Instant;
6use tokio::sync::RwLock;
7use async_trait::async_trait;
8
9use lumosai_vector_core::prelude::*;
10use crate::{MemoryConfig, MemoryIndex};
11
12/// High-performance in-memory vector storage
13pub struct MemoryVectorStorage {
14    /// Storage configuration
15    config: MemoryConfig,
16    /// Indexes stored in memory
17    indexes: Arc<RwLock<HashMap<String, MemoryIndex>>>,
18    /// Storage statistics
19    stats: Arc<RwLock<StorageStats>>,
20    /// Performance monitor
21    performance_monitor: Arc<PerformanceMonitor>,
22    /// Search result cache
23    search_cache: Arc<LRUCache<String, SearchResponse>>,
24}
25
26/// Storage statistics
27#[derive(Debug, Default)]
28pub struct StorageStats {
29    /// Total number of indexes
30    index_count: usize,
31    /// Total number of vectors across all indexes
32    total_vectors: usize,
33    /// Total memory usage in bytes
34    memory_usage_bytes: u64,
35    /// Number of search operations
36    search_count: u64,
37    /// Total search time in milliseconds
38    total_search_time_ms: u64,
39}
40
41impl MemoryVectorStorage {
42    /// Create a new memory vector storage with default configuration
43    pub async fn new() -> Result<Self> {
44        Self::with_config(MemoryConfig::default()).await
45    }
46    
47    /// Create a new memory vector storage with specified capacity
48    pub async fn with_capacity(capacity: usize) -> Result<Self> {
49        let config = MemoryConfig::new().with_initial_capacity(capacity);
50        Self::with_config(config).await
51    }
52    
53    /// Create a new memory vector storage with custom configuration
54    pub async fn with_config(config: MemoryConfig) -> Result<Self> {
55        let cache_config = CacheConfig {
56            max_entries: 100,
57            ttl: std::time::Duration::from_secs(300), // 5 minutes
58            enable_lru: true,
59            stats_interval: std::time::Duration::from_secs(60),
60        };
61
62        Ok(Self {
63            config,
64            indexes: Arc::new(RwLock::new(HashMap::new())),
65            stats: Arc::new(RwLock::new(StorageStats::default())),
66            performance_monitor: Arc::new(PerformanceMonitor::new()),
67            search_cache: Arc::new(LRUCache::new(cache_config)),
68        })
69    }
70    
71    /// Get storage statistics
72    pub async fn get_stats(&self) -> StorageStats {
73        self.stats.read().await.clone()
74    }
75    
76    /// Get memory usage in bytes
77    pub async fn memory_usage(&self) -> u64 {
78        self.stats.read().await.memory_usage_bytes
79    }
80    
81    /// Cleanup unused memory (if configured)
82    pub async fn cleanup(&self) -> Result<()> {
83        if let Some(threshold_mb) = self.config.memory_threshold_mb {
84            let current_usage_mb = self.memory_usage().await / (1024 * 1024);
85            if current_usage_mb > threshold_mb as u64 {
86                // Clear search cache to free memory
87                self.search_cache.clear().await;
88
89                // Trigger garbage collection or other cleanup
90                // For now, this is a no-op, but could be extended
91            }
92        }
93        Ok(())
94    }
95
96    /// Get performance metrics
97    pub async fn get_performance_metrics(&self) -> lumosai_vector_core::PerformanceMetrics {
98        self.performance_monitor.get_metrics().await
99    }
100
101    /// Get cache statistics
102    pub async fn get_cache_stats(&self) -> lumosai_vector_core::CacheStats {
103        self.search_cache.get_stats().await
104    }
105}
106
107#[async_trait]
108impl VectorStorage for MemoryVectorStorage {
109    type Config = MemoryConfig;
110    
111    async fn create_index(&self, config: IndexConfig) -> Result<()> {
112        let mut indexes = self.indexes.write().await;
113        
114        if indexes.contains_key(&config.name) {
115            return Err(VectorError::index_already_exists(&config.name));
116        }
117        
118        let index = MemoryIndex::new(config.clone(), &self.config)?;
119        indexes.insert(config.name.clone(), index);
120        
121        // Update stats
122        let mut stats = self.stats.write().await;
123        stats.index_count += 1;
124        
125        Ok(())
126    }
127    
128    async fn list_indexes(&self) -> Result<Vec<String>> {
129        let indexes = self.indexes.read().await;
130        Ok(indexes.keys().cloned().collect())
131    }
132    
133    async fn describe_index(&self, index_name: &str) -> Result<IndexInfo> {
134        let indexes = self.indexes.read().await;
135        let index = indexes.get(index_name)
136            .ok_or_else(|| VectorError::index_not_found(index_name))?;
137        
138        Ok(index.get_info())
139    }
140    
141    async fn delete_index(&self, index_name: &str) -> Result<()> {
142        let mut indexes = self.indexes.write().await;
143        
144        if !indexes.contains_key(index_name) {
145            return Err(VectorError::index_not_found(index_name));
146        }
147        
148        let removed_index = indexes.remove(index_name).unwrap();
149        
150        // Update stats
151        let mut stats = self.stats.write().await;
152        stats.index_count -= 1;
153        stats.total_vectors -= removed_index.vector_count();
154        stats.memory_usage_bytes -= removed_index.memory_usage();
155        
156        Ok(())
157    }
158    
159    async fn upsert_documents(&self, index_name: &str, documents: Vec<Document>) -> Result<Vec<DocumentId>> {
160        let mut indexes = self.indexes.write().await;
161        let index = indexes.get_mut(index_name)
162            .ok_or_else(|| VectorError::index_not_found(index_name))?;
163        
164        let mut document_ids = Vec::new();
165        let mut vectors_added = 0;
166        let mut memory_added = 0;
167        
168        for document in documents {
169            let embedding = document.embedding.as_ref()
170                .ok_or_else(|| VectorError::InvalidVector("Document must have embedding".to_string()))?;
171            
172            if embedding.len() != index.dimension() {
173                return Err(VectorError::dimension_mismatch(index.dimension(), embedding.len()));
174            }
175            
176            // Check capacity limits
177            if let Some(max_vectors) = self.config.max_vectors_per_index {
178                if index.vector_count() >= max_vectors {
179                    return Err(VectorError::ResourceLimitExceeded(
180                        format!("Index {} has reached maximum capacity of {} vectors", index_name, max_vectors)
181                    ));
182                }
183            }
184            
185            let was_new = index.upsert_document(document.clone())?;
186            document_ids.push(document.id.clone());
187            
188            if was_new {
189                vectors_added += 1;
190                memory_added += index.estimate_document_memory(&document);
191            }
192        }
193        
194        // Update stats
195        let mut stats = self.stats.write().await;
196        stats.total_vectors += vectors_added;
197        stats.memory_usage_bytes += memory_added;
198        
199        Ok(document_ids)
200    }
201    
202    async fn search(&self, request: SearchRequest) -> Result<SearchResponse> {
203        let start_time = Instant::now();
204
205        // Generate cache key for the search request
206        let cache_key = format!("{}_{}_{}",
207            request.index_name,
208            request.top_k,
209            serde_json::to_string(&request.query).unwrap_or_default()
210        );
211
212        // Check cache first
213        if let Some(cached_response) = self.search_cache.get(&cache_key).await {
214            let duration = start_time.elapsed();
215            self.performance_monitor.record_operation(duration, true).await;
216            return Ok(cached_response);
217        }
218
219        let indexes = self.indexes.read().await;
220        let index = indexes.get(&request.index_name)
221            .ok_or_else(|| VectorError::index_not_found(&request.index_name))?;
222
223        let results = index.search(&request)?;
224
225        let execution_time_ms = start_time.elapsed().as_millis() as u64;
226        let duration = start_time.elapsed();
227
228        // Update stats
229        {
230            let mut stats = self.stats.write().await;
231            stats.search_count += 1;
232            stats.total_search_time_ms += execution_time_ms;
233        }
234
235        let response = SearchResponse::new(results)
236            .with_execution_time(execution_time_ms);
237
238        // Cache the response
239        self.search_cache.set(cache_key, response.clone()).await;
240
241        // Record performance metrics
242        self.performance_monitor.record_operation(duration, true).await;
243
244        Ok(response)
245    }
246    
247    async fn update_document(&self, index_name: &str, document: Document) -> Result<()> {
248        let mut indexes = self.indexes.write().await;
249        let index = indexes.get_mut(index_name)
250            .ok_or_else(|| VectorError::index_not_found(index_name))?;
251        
252        if let Some(embedding) = &document.embedding {
253            if embedding.len() != index.dimension() {
254                return Err(VectorError::dimension_mismatch(index.dimension(), embedding.len()));
255            }
256        }
257        
258        index.update_document(document)?;
259        Ok(())
260    }
261    
262    async fn delete_documents(&self, index_name: &str, ids: Vec<DocumentId>) -> Result<()> {
263        let mut indexes = self.indexes.write().await;
264        let index = indexes.get_mut(index_name)
265            .ok_or_else(|| VectorError::index_not_found(index_name))?;
266        
267        let mut vectors_removed = 0;
268        let mut memory_freed = 0;
269        
270        for id in ids {
271            if let Some(removed_doc) = index.delete_document(&id)? {
272                vectors_removed += 1;
273                memory_freed += index.estimate_document_memory(&removed_doc);
274            }
275        }
276        
277        // Update stats
278        let mut stats = self.stats.write().await;
279        stats.total_vectors -= vectors_removed;
280        stats.memory_usage_bytes = stats.memory_usage_bytes.saturating_sub(memory_freed);
281        
282        Ok(())
283    }
284    
285    async fn get_documents(&self, index_name: &str, ids: Vec<DocumentId>, include_vectors: bool) -> Result<Vec<Document>> {
286        let indexes = self.indexes.read().await;
287        let index = indexes.get(index_name)
288            .ok_or_else(|| VectorError::index_not_found(index_name))?;
289        
290        let mut documents = Vec::new();
291        for id in ids {
292            if let Some(mut document) = index.get_document(&id)? {
293                if !include_vectors {
294                    document.embedding = None;
295                }
296                documents.push(document);
297            }
298        }
299        
300        Ok(documents)
301    }
302    
303    async fn health_check(&self) -> Result<()> {
304        // Check if we can acquire locks
305        let _indexes = self.indexes.read().await;
306        let _stats = self.stats.read().await;
307        
308        // Check memory usage if configured
309        if let Some(threshold_mb) = self.config.memory_threshold_mb {
310            let current_usage_mb = self.memory_usage().await / (1024 * 1024);
311            if current_usage_mb > threshold_mb as u64 * 2 {
312                return Err(VectorError::ResourceLimitExceeded(
313                    format!("Memory usage {} MB exceeds critical threshold", current_usage_mb)
314                ));
315            }
316        }
317        
318        Ok(())
319    }
320    
321    fn backend_info(&self) -> BackendInfo {
322        BackendInfo::new("memory", env!("CARGO_PKG_VERSION"))
323            .with_feature("high_performance")
324            .with_feature("thread_safe")
325            .with_feature("complex_filtering")
326            .with_feature("multiple_metrics")
327            .with_metadata("initial_capacity", MetadataValue::Integer(self.config.initial_capacity as i64))
328            .with_metadata("approximate_search", MetadataValue::Boolean(self.config.enable_approximate))
329    }
330}
331
332impl Clone for MemoryVectorStorage {
333    fn clone(&self) -> Self {
334        Self {
335            config: self.config.clone(),
336            indexes: Arc::clone(&self.indexes),
337            stats: Arc::clone(&self.stats),
338            performance_monitor: Arc::clone(&self.performance_monitor),
339            search_cache: Arc::clone(&self.search_cache),
340        }
341    }
342}
343
344impl Clone for StorageStats {
345    fn clone(&self) -> Self {
346        Self {
347            index_count: self.index_count,
348            total_vectors: self.total_vectors,
349            memory_usage_bytes: self.memory_usage_bytes,
350            search_count: self.search_count,
351            total_search_time_ms: self.total_search_time_ms,
352        }
353    }
354}