lumosai_vector_memory/
storage.rs1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Instant;
6use tokio::sync::RwLock;
7use async_trait::async_trait;
8
9use lumosai_vector_core::prelude::*;
10use crate::{MemoryConfig, MemoryIndex};
11
12pub struct MemoryVectorStorage {
14 config: MemoryConfig,
16 indexes: Arc<RwLock<HashMap<String, MemoryIndex>>>,
18 stats: Arc<RwLock<StorageStats>>,
20 performance_monitor: Arc<PerformanceMonitor>,
22 search_cache: Arc<LRUCache<String, SearchResponse>>,
24}
25
26#[derive(Debug, Default)]
28pub struct StorageStats {
29 index_count: usize,
31 total_vectors: usize,
33 memory_usage_bytes: u64,
35 search_count: u64,
37 total_search_time_ms: u64,
39}
40
41impl MemoryVectorStorage {
42 pub async fn new() -> Result<Self> {
44 Self::with_config(MemoryConfig::default()).await
45 }
46
47 pub async fn with_capacity(capacity: usize) -> Result<Self> {
49 let config = MemoryConfig::new().with_initial_capacity(capacity);
50 Self::with_config(config).await
51 }
52
53 pub async fn with_config(config: MemoryConfig) -> Result<Self> {
55 let cache_config = CacheConfig {
56 max_entries: 100,
57 ttl: std::time::Duration::from_secs(300), enable_lru: true,
59 stats_interval: std::time::Duration::from_secs(60),
60 };
61
62 Ok(Self {
63 config,
64 indexes: Arc::new(RwLock::new(HashMap::new())),
65 stats: Arc::new(RwLock::new(StorageStats::default())),
66 performance_monitor: Arc::new(PerformanceMonitor::new()),
67 search_cache: Arc::new(LRUCache::new(cache_config)),
68 })
69 }
70
71 pub async fn get_stats(&self) -> StorageStats {
73 self.stats.read().await.clone()
74 }
75
76 pub async fn memory_usage(&self) -> u64 {
78 self.stats.read().await.memory_usage_bytes
79 }
80
81 pub async fn cleanup(&self) -> Result<()> {
83 if let Some(threshold_mb) = self.config.memory_threshold_mb {
84 let current_usage_mb = self.memory_usage().await / (1024 * 1024);
85 if current_usage_mb > threshold_mb as u64 {
86 self.search_cache.clear().await;
88
89 }
92 }
93 Ok(())
94 }
95
96 pub async fn get_performance_metrics(&self) -> lumosai_vector_core::PerformanceMetrics {
98 self.performance_monitor.get_metrics().await
99 }
100
101 pub async fn get_cache_stats(&self) -> lumosai_vector_core::CacheStats {
103 self.search_cache.get_stats().await
104 }
105}
106
107#[async_trait]
108impl VectorStorage for MemoryVectorStorage {
109 type Config = MemoryConfig;
110
111 async fn create_index(&self, config: IndexConfig) -> Result<()> {
112 let mut indexes = self.indexes.write().await;
113
114 if indexes.contains_key(&config.name) {
115 return Err(VectorError::index_already_exists(&config.name));
116 }
117
118 let index = MemoryIndex::new(config.clone(), &self.config)?;
119 indexes.insert(config.name.clone(), index);
120
121 let mut stats = self.stats.write().await;
123 stats.index_count += 1;
124
125 Ok(())
126 }
127
128 async fn list_indexes(&self) -> Result<Vec<String>> {
129 let indexes = self.indexes.read().await;
130 Ok(indexes.keys().cloned().collect())
131 }
132
133 async fn describe_index(&self, index_name: &str) -> Result<IndexInfo> {
134 let indexes = self.indexes.read().await;
135 let index = indexes.get(index_name)
136 .ok_or_else(|| VectorError::index_not_found(index_name))?;
137
138 Ok(index.get_info())
139 }
140
141 async fn delete_index(&self, index_name: &str) -> Result<()> {
142 let mut indexes = self.indexes.write().await;
143
144 if !indexes.contains_key(index_name) {
145 return Err(VectorError::index_not_found(index_name));
146 }
147
148 let removed_index = indexes.remove(index_name).unwrap();
149
150 let mut stats = self.stats.write().await;
152 stats.index_count -= 1;
153 stats.total_vectors -= removed_index.vector_count();
154 stats.memory_usage_bytes -= removed_index.memory_usage();
155
156 Ok(())
157 }
158
159 async fn upsert_documents(&self, index_name: &str, documents: Vec<Document>) -> Result<Vec<DocumentId>> {
160 let mut indexes = self.indexes.write().await;
161 let index = indexes.get_mut(index_name)
162 .ok_or_else(|| VectorError::index_not_found(index_name))?;
163
164 let mut document_ids = Vec::new();
165 let mut vectors_added = 0;
166 let mut memory_added = 0;
167
168 for document in documents {
169 let embedding = document.embedding.as_ref()
170 .ok_or_else(|| VectorError::InvalidVector("Document must have embedding".to_string()))?;
171
172 if embedding.len() != index.dimension() {
173 return Err(VectorError::dimension_mismatch(index.dimension(), embedding.len()));
174 }
175
176 if let Some(max_vectors) = self.config.max_vectors_per_index {
178 if index.vector_count() >= max_vectors {
179 return Err(VectorError::ResourceLimitExceeded(
180 format!("Index {} has reached maximum capacity of {} vectors", index_name, max_vectors)
181 ));
182 }
183 }
184
185 let was_new = index.upsert_document(document.clone())?;
186 document_ids.push(document.id.clone());
187
188 if was_new {
189 vectors_added += 1;
190 memory_added += index.estimate_document_memory(&document);
191 }
192 }
193
194 let mut stats = self.stats.write().await;
196 stats.total_vectors += vectors_added;
197 stats.memory_usage_bytes += memory_added;
198
199 Ok(document_ids)
200 }
201
202 async fn search(&self, request: SearchRequest) -> Result<SearchResponse> {
203 let start_time = Instant::now();
204
205 let cache_key = format!("{}_{}_{}",
207 request.index_name,
208 request.top_k,
209 serde_json::to_string(&request.query).unwrap_or_default()
210 );
211
212 if let Some(cached_response) = self.search_cache.get(&cache_key).await {
214 let duration = start_time.elapsed();
215 self.performance_monitor.record_operation(duration, true).await;
216 return Ok(cached_response);
217 }
218
219 let indexes = self.indexes.read().await;
220 let index = indexes.get(&request.index_name)
221 .ok_or_else(|| VectorError::index_not_found(&request.index_name))?;
222
223 let results = index.search(&request)?;
224
225 let execution_time_ms = start_time.elapsed().as_millis() as u64;
226 let duration = start_time.elapsed();
227
228 {
230 let mut stats = self.stats.write().await;
231 stats.search_count += 1;
232 stats.total_search_time_ms += execution_time_ms;
233 }
234
235 let response = SearchResponse::new(results)
236 .with_execution_time(execution_time_ms);
237
238 self.search_cache.set(cache_key, response.clone()).await;
240
241 self.performance_monitor.record_operation(duration, true).await;
243
244 Ok(response)
245 }
246
247 async fn update_document(&self, index_name: &str, document: Document) -> Result<()> {
248 let mut indexes = self.indexes.write().await;
249 let index = indexes.get_mut(index_name)
250 .ok_or_else(|| VectorError::index_not_found(index_name))?;
251
252 if let Some(embedding) = &document.embedding {
253 if embedding.len() != index.dimension() {
254 return Err(VectorError::dimension_mismatch(index.dimension(), embedding.len()));
255 }
256 }
257
258 index.update_document(document)?;
259 Ok(())
260 }
261
262 async fn delete_documents(&self, index_name: &str, ids: Vec<DocumentId>) -> Result<()> {
263 let mut indexes = self.indexes.write().await;
264 let index = indexes.get_mut(index_name)
265 .ok_or_else(|| VectorError::index_not_found(index_name))?;
266
267 let mut vectors_removed = 0;
268 let mut memory_freed = 0;
269
270 for id in ids {
271 if let Some(removed_doc) = index.delete_document(&id)? {
272 vectors_removed += 1;
273 memory_freed += index.estimate_document_memory(&removed_doc);
274 }
275 }
276
277 let mut stats = self.stats.write().await;
279 stats.total_vectors -= vectors_removed;
280 stats.memory_usage_bytes = stats.memory_usage_bytes.saturating_sub(memory_freed);
281
282 Ok(())
283 }
284
285 async fn get_documents(&self, index_name: &str, ids: Vec<DocumentId>, include_vectors: bool) -> Result<Vec<Document>> {
286 let indexes = self.indexes.read().await;
287 let index = indexes.get(index_name)
288 .ok_or_else(|| VectorError::index_not_found(index_name))?;
289
290 let mut documents = Vec::new();
291 for id in ids {
292 if let Some(mut document) = index.get_document(&id)? {
293 if !include_vectors {
294 document.embedding = None;
295 }
296 documents.push(document);
297 }
298 }
299
300 Ok(documents)
301 }
302
303 async fn health_check(&self) -> Result<()> {
304 let _indexes = self.indexes.read().await;
306 let _stats = self.stats.read().await;
307
308 if let Some(threshold_mb) = self.config.memory_threshold_mb {
310 let current_usage_mb = self.memory_usage().await / (1024 * 1024);
311 if current_usage_mb > threshold_mb as u64 * 2 {
312 return Err(VectorError::ResourceLimitExceeded(
313 format!("Memory usage {} MB exceeds critical threshold", current_usage_mb)
314 ));
315 }
316 }
317
318 Ok(())
319 }
320
321 fn backend_info(&self) -> BackendInfo {
322 BackendInfo::new("memory", env!("CARGO_PKG_VERSION"))
323 .with_feature("high_performance")
324 .with_feature("thread_safe")
325 .with_feature("complex_filtering")
326 .with_feature("multiple_metrics")
327 .with_metadata("initial_capacity", MetadataValue::Integer(self.config.initial_capacity as i64))
328 .with_metadata("approximate_search", MetadataValue::Boolean(self.config.enable_approximate))
329 }
330}
331
332impl Clone for MemoryVectorStorage {
333 fn clone(&self) -> Self {
334 Self {
335 config: self.config.clone(),
336 indexes: Arc::clone(&self.indexes),
337 stats: Arc::clone(&self.stats),
338 performance_monitor: Arc::clone(&self.performance_monitor),
339 search_cache: Arc::clone(&self.search_cache),
340 }
341 }
342}
343
344impl Clone for StorageStats {
345 fn clone(&self) -> Self {
346 Self {
347 index_count: self.index_count,
348 total_vectors: self.total_vectors,
349 memory_usage_bytes: self.memory_usage_bytes,
350 search_count: self.search_count,
351 total_search_time_ms: self.total_search_time_ms,
352 }
353 }
354}