Skip to main content

vectrust_storage/
optimized.rs

1use async_trait::async_trait;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use tokio::fs;
5use tokio::sync::RwLock;
6use vectrust_core::*;
7use uuid::Uuid;
8use serde::{Serialize, Deserialize};
9use rocksdb::{DB, Options};
10use memmap2::{MmapMut, MmapOptions};
11use std::fs::OpenOptions;
12use std::io::{Seek, SeekFrom, Write};
13use bincode;
14
15/// Optimized storage format (v2) with better performance
16pub struct OptimizedStorage {
17    path: PathBuf,
18    db: Arc<RwLock<Option<DB>>>,
19    vector_file: Arc<RwLock<Option<std::fs::File>>>,
20    vector_mmap: Arc<RwLock<Option<MmapMut>>>,
21    manifest: Arc<RwLock<Option<Manifest>>>,
22    dimensions: Arc<RwLock<Option<usize>>>,
23    // Performance optimization: batch manifest updates
24    manifest_dirty: Arc<RwLock<bool>>,
25    operations_since_save: Arc<RwLock<u32>>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct Manifest {
30    pub version: u32,
31    pub format: String,
32    pub created_at: chrono::DateTime<chrono::Utc>,
33    pub dimensions: Option<usize>,
34    pub distance_metric: DistanceMetric,
35    pub total_items: usize,
36    pub vector_file_size: u64,
37    pub next_vector_offset: u64,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41struct VectorRecord {
42    pub id: Uuid,
43    pub offset: u64,
44    pub dimensions: usize,
45    pub deleted: bool,
46}
47
48const METADATA_CF: &str = "metadata";
49const VECTOR_INDEX_CF: &str = "vector_index";
50const VECTOR_HEADER_SIZE: usize = 8; // u64 for dimensions count
51
52const MANIFEST_SAVE_INTERVAL: u32 = 100; // Save manifest every N operations
53
54impl OptimizedStorage {
55    pub fn new(path: &Path) -> Result<Self> {
56        Ok(Self {
57            path: path.to_path_buf(),
58            db: Arc::new(RwLock::new(None)),
59            vector_file: Arc::new(RwLock::new(None)),
60            vector_mmap: Arc::new(RwLock::new(None)),
61            manifest: Arc::new(RwLock::new(None)),
62            dimensions: Arc::new(RwLock::new(None)),
63            manifest_dirty: Arc::new(RwLock::new(false)),
64            operations_since_save: Arc::new(RwLock::new(0)),
65        })
66    }
67    
68    async fn initialize_storage(&self) -> Result<()> {
69        // Create directory if it doesn't exist
70        if !self.path.exists() {
71            std::fs::create_dir_all(&self.path)?;
72        }
73        
74        // Open RocksDB with optimized settings for vector workloads
75        let db_path = self.path.join("metadata");
76        let mut db_opts = Options::default();
77        db_opts.create_if_missing(true);
78        db_opts.create_missing_column_families(true);
79        
80        // Performance optimizations for vector operations
81        db_opts.set_max_write_buffer_number(4);
82        db_opts.set_write_buffer_size(64 * 1024 * 1024); // 64MB
83        db_opts.set_target_file_size_base(64 * 1024 * 1024); // 64MB
84        db_opts.set_level_compaction_dynamic_level_bytes(true);
85        db_opts.set_max_bytes_for_level_base(256 * 1024 * 1024); // 256MB
86        db_opts.set_max_background_jobs(4);
87        db_opts.set_bytes_per_sync(64 * 1024 * 1024); // 64MB - sync less frequently
88        
89        // Note: We're not disabling auto-compactions as it can cause issues
90        
91        let cf_names = vec![METADATA_CF, VECTOR_INDEX_CF];
92        let db = DB::open_cf(&db_opts, db_path, cf_names)?;
93        
94        *self.db.write().await = Some(db);
95        
96        // Load or create manifest
97        if let Some(manifest) = self.load_manifest().await? {
98            *self.manifest.write().await = Some(manifest.clone());
99            *self.dimensions.write().await = manifest.dimensions;
100            
101            // Open existing vector file
102            let vector_path = self.path.join("vectors.dat");
103            if vector_path.exists() {
104                let file = OpenOptions::new()
105                    .read(true)
106                    .write(true)
107                    .open(&vector_path)?;
108                
109                let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
110                
111                *self.vector_file.write().await = Some(file);
112                *self.vector_mmap.write().await = Some(mmap);
113            }
114        }
115        
116        Ok(())
117    }
118    
119    async fn create_vector_file(&self, initial_size: u64) -> Result<()> {
120        let vector_path = self.path.join("vectors.dat");
121        
122        let mut file = OpenOptions::new()
123            .read(true)
124            .write(true)
125            .create(true)
126            .truncate(true)
127            .open(&vector_path)?;
128            
129        // Pre-allocate space
130        file.seek(SeekFrom::Start(initial_size - 1))?;
131        file.write_all(&[0])?;
132        file.flush()?;
133        
134        let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
135        
136        *self.vector_file.write().await = Some(file);
137        *self.vector_mmap.write().await = Some(mmap);
138        
139        Ok(())
140    }
141    
142    async fn write_vector_to_file(&self, vector: &[f32], offset: u64) -> Result<()> {
143        let mut mmap_guard = self.vector_mmap.write().await;
144        if let Some(ref mut mmap) = *mmap_guard {
145            let start = offset as usize;
146            let dimensions = vector.len();
147            
148            // Check bounds
149            let end_pos = start + VECTOR_HEADER_SIZE + (dimensions * 4);
150            let mmap_len = mmap.len();
151            if end_pos > mmap_len {
152                return Err(VectraError::StorageError {
153                    message: format!("Memory map too small: need {} bytes, have {} bytes", 
154                                   end_pos, mmap_len)
155                });
156            }
157            
158            // Write dimensions count first (8 bytes)
159            let dim_bytes = (dimensions as u64).to_le_bytes();
160            mmap[start..start + 8].copy_from_slice(&dim_bytes);
161            
162            // Write vector data (4 bytes per f32)
163            let vector_start = start + VECTOR_HEADER_SIZE;
164            for (i, &value) in vector.iter().enumerate() {
165                let value_bytes = value.to_le_bytes();
166                let pos = vector_start + (i * 4);
167                mmap[pos..pos + 4].copy_from_slice(&value_bytes);
168            }
169            
170            // Don't flush on every write - let OS handle it for better performance
171            // mmap.flush()?;
172        }
173        
174        Ok(())
175    }
176    
177    async fn read_vector_from_file(&self, offset: u64, expected_dims: usize) -> Result<Vec<f32>> {
178        let mmap_guard = self.vector_mmap.read().await;
179        if let Some(ref mmap) = *mmap_guard {
180            let start = offset as usize;
181            
182            // Read dimensions count
183            let mut dim_bytes = [0u8; 8];
184            dim_bytes.copy_from_slice(&mmap[start..start + 8]);
185            let dimensions = u64::from_le_bytes(dim_bytes) as usize;
186            
187            if dimensions != expected_dims {
188                return Err(VectraError::VectorValidation {
189                    message: format!("Dimension mismatch: expected {}, got {}", expected_dims, dimensions)
190                });
191            }
192            
193            // Read vector data
194            let mut vector = Vec::with_capacity(dimensions);
195            let vector_start = start + VECTOR_HEADER_SIZE;
196            
197            for i in 0..dimensions {
198                let pos = vector_start + (i * 4);
199                let mut value_bytes = [0u8; 4];
200                value_bytes.copy_from_slice(&mmap[pos..pos + 4]);
201                vector.push(f32::from_le_bytes(value_bytes));
202            }
203            
204            Ok(vector)
205        } else {
206            Err(VectraError::StorageError { 
207                message: "Vector file not initialized".to_string() 
208            })
209        }
210    }
211    
212    fn manifest_path(&self) -> PathBuf {
213        self.path.join("manifest.json")
214    }
215    
216    async fn load_manifest(&self) -> Result<Option<Manifest>> {
217        let manifest_path = self.manifest_path();
218        
219        if !manifest_path.exists() {
220            return Ok(None);
221        }
222        
223        let content = fs::read_to_string(manifest_path).await?;
224        let manifest: Manifest = serde_json::from_str(&content)?;
225        Ok(Some(manifest))
226    }
227    
228    async fn save_manifest_to_disk(&self, manifest: &Manifest) -> Result<()> {
229        let manifest_path = self.manifest_path();
230        
231        // Debug: Track manifest saves
232        static SAVE_COUNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
233        let count = SAVE_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
234        if count % 10 == 0 {
235            println!("DEBUG: Manifest save #{}, total_items: {}", count + 1, manifest.total_items);
236        }
237        
238        // Ensure directory exists
239        if let Some(parent) = manifest_path.parent() {
240            fs::create_dir_all(parent).await?;
241        }
242        
243        let start = std::time::Instant::now();
244        let content = serde_json::to_string_pretty(manifest)?;
245        let json_time = start.elapsed();
246        
247        let start = std::time::Instant::now();
248        fs::write(manifest_path, content).await?;
249        let write_time = start.elapsed();
250        
251        if count % 10 == 0 {
252            println!("  JSON serialize: {} µs, Disk write: {} µs", 
253                     json_time.as_micros(), write_time.as_micros());
254        }
255        
256        Ok(())
257    }
258    
259    async fn save_manifest(&self, manifest: &Manifest) -> Result<()> {
260        self.save_manifest_to_disk(manifest).await?;
261        
262        // Update in-memory manifest
263        *self.manifest.write().await = Some(manifest.clone());
264        
265        Ok(())
266    }
267    
268    /// Mark manifest as dirty and potentially save it based on batching interval
269    async fn mark_manifest_dirty(&self) -> Result<()> {
270        *self.manifest_dirty.write().await = true;
271        
272        let mut ops_count = self.operations_since_save.write().await;
273        *ops_count += 1;
274        
275        // Save manifest every N operations for crash safety vs performance balance
276        if *ops_count >= MANIFEST_SAVE_INTERVAL {
277            drop(ops_count); // Release lock before calling save
278            self.flush_manifest_if_dirty().await?;
279        }
280        
281        Ok(())
282    }
283    
284    /// Save manifest to disk if it's been modified, reset dirty flag
285    async fn flush_manifest_if_dirty(&self) -> Result<()> {
286        let is_dirty = {
287            let dirty = self.manifest_dirty.read().await;
288            *dirty
289        };
290        
291        if is_dirty {
292            let manifest = {
293                let manifest_guard = self.manifest.read().await;
294                manifest_guard.clone()
295            };
296            
297            if let Some(manifest) = manifest {
298                self.save_manifest_to_disk(&manifest).await?;
299                *self.manifest_dirty.write().await = false;
300                *self.operations_since_save.write().await = 0;
301            }
302        }
303        
304        Ok(())
305    }
306    
307    async fn ensure_vector_file_capacity(&self, needed_size: u64) -> Result<()> {
308        let vector_path = self.path.join("vectors.dat");
309        
310        // Check if file exists first
311        if !vector_path.exists() {
312            return Err(VectraError::StorageError {
313                message: "Vector file does not exist".to_string()
314            });
315        }
316        
317        // Get current file size
318        let metadata = tokio::fs::metadata(&vector_path).await?;
319        let current_size = metadata.len();
320        
321        if needed_size > current_size {
322            // Calculate new size with some headroom (grow by at least 50% or to needed size + 10MB)
323            let new_size = std::cmp::max(
324                (current_size as f64 * 1.5) as u64,
325                needed_size + (10 * 1024 * 1024) // Add 10MB buffer
326            );
327            
328            println!("    📈 Growing vector file from {} MB to {} MB", 
329                     current_size / 1024 / 1024, 
330                     new_size / 1024 / 1024);
331            
332            // We need to recreate the memory map with a larger file
333            // First, flush and drop the current mmap
334            {
335                let mut mmap_guard = self.vector_mmap.write().await;
336                if let Some(ref mut mmap) = *mmap_guard {
337                    mmap.flush()?;
338                }
339                *mmap_guard = None; // Drop the old mmap
340            }
341            
342            // Also drop the file handle
343            {
344                let mut file_guard = self.vector_file.write().await;
345                *file_guard = None;
346            }
347            
348            // Resize the file
349            let file = OpenOptions::new()
350                .read(true)
351                .write(true)
352                .open(&vector_path)?;
353            
354            file.set_len(new_size)?;
355            file.sync_all()?; // Ensure the resize is flushed
356            
357            // Create new memory map
358            let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
359            
360            // Update the handles
361            *self.vector_file.write().await = Some(file);
362            *self.vector_mmap.write().await = Some(mmap);
363            
364            println!("    ✅ Vector file resized successfully");
365        }
366        
367        Ok(())
368    }
369    
370    async fn get_next_vector_offset(&self, vector_size: usize) -> Result<u64> {
371        let current_offset = {
372            let mut manifest_guard = self.manifest.write().await;
373            if let Some(ref mut manifest) = *manifest_guard {
374                let current_offset = manifest.next_vector_offset;
375                let record_size = VECTOR_HEADER_SIZE + (vector_size * 4); // header + 4 bytes per f32
376                manifest.next_vector_offset += record_size as u64;
377                manifest.vector_file_size = manifest.next_vector_offset;
378                current_offset
379            } else {
380                return Err(VectraError::StorageError {
381                    message: "Manifest not initialized".to_string()
382                });
383            }
384        };
385        
386        // Don't mark dirty here - let the caller decide when to mark dirty
387        Ok(current_offset)
388    }
389    
390    async fn get_next_vector_offset_and_mark_dirty(&self, vector_size: usize) -> Result<u64> {
391        let offset = self.get_next_vector_offset(vector_size).await?;
392        self.mark_manifest_dirty().await?;
393        Ok(offset)
394    }
395    
396    /// Ensure all pending changes are flushed to disk
397    pub async fn flush(&self) -> Result<()> {
398        // Flush manifest
399        self.flush_manifest_if_dirty().await?;
400        
401        // Flush memory-mapped file if it exists
402        if let Some(ref mmap) = *self.vector_mmap.read().await {
403            mmap.flush()?;
404        }
405        
406        // Flush RocksDB
407        if let Some(ref db) = *self.db.read().await {
408            db.flush()?;
409        }
410        
411        Ok(())
412    }
413}
414
415#[async_trait]
416impl StorageBackend for OptimizedStorage {
417    async fn exists(&self) -> bool {
418        self.manifest_path().exists()
419    }
420    
421    async fn create_index(&mut self, config: &CreateIndexConfig) -> Result<()> {
422        let manifest_path = self.manifest_path();
423        
424        if manifest_path.exists() && !config.delete_if_exists {
425            return Err(VectraError::IndexAlreadyExists { 
426                path: manifest_path.to_string_lossy().to_string() 
427            });
428        }
429        
430        // Clean up existing files if delete_if_exists is true
431        if config.delete_if_exists && self.path.exists() {
432            fs::remove_dir_all(&self.path).await.ok();
433        }
434        
435        let manifest = Manifest {
436            version: 2,
437            format: "optimized".to_string(),
438            created_at: chrono::Utc::now(),
439            dimensions: None,
440            distance_metric: config.distance_metric.clone(),
441            total_items: 0,
442            vector_file_size: 0,
443            next_vector_offset: 0,
444        };
445        
446        self.save_manifest(&manifest).await?;
447        
448        // Initialize storage components
449        self.initialize_storage().await?;
450        
451        // Make sure manifest is persisted for create operations
452        self.flush_manifest_if_dirty().await?;
453        
454        // Create initial vector file (10MB initial size - reasonable for ~6.5K vectors with 384 dims)
455        // File will grow dynamically as needed
456        self.create_vector_file(10 * 1024 * 1024).await?;
457        
458        Ok(())
459    }
460    
461    async fn get_item(&self, id: &Uuid) -> Result<Option<VectorItem>> {
462        let db_guard = self.db.read().await;
463        if let Some(ref db) = *db_guard {
464            let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
465            let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
466            
467            let id_bytes = id.as_bytes();
468            
469            // Get metadata
470            if let Some(metadata_bytes) = db.get_cf(metadata_cf, id_bytes)? {
471                let mut item: VectorItem = serde_json::from_slice(&metadata_bytes)?;
472                
473                // Get vector record
474                if let Some(vector_record_bytes) = db.get_cf(vector_index_cf, id_bytes)? {
475                    let vector_record: VectorRecord = bincode::deserialize(&vector_record_bytes)?;
476                    
477                    if !vector_record.deleted {
478                        // Read vector from memory-mapped file
479                        item.vector = self.read_vector_from_file(vector_record.offset, vector_record.dimensions).await?;
480                        return Ok(Some(item));
481                    }
482                }
483            }
484        }
485        
486        Ok(None)
487    }
488    
489    async fn insert_item(&mut self, item: &VectorItem) -> Result<()> {
490        // Ensure storage is initialized
491        if self.db.read().await.is_none() {
492            self.initialize_storage().await?;
493        }
494        
495        // Add timing for debugging
496        let start_total = std::time::Instant::now();
497        
498        // Debug: count inserts
499        static DEBUG_COUNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
500        let debug_count = DEBUG_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
501        if debug_count % 50 == 0 {
502            println!("DEBUG: insert_item called {} times", debug_count + 1);
503        }
504        
505        let dimensions = item.vector.len();
506        
507        // Set dimensions if this is the first item
508        let mut needs_manifest_update = false;
509        {
510            let mut dims_guard = self.dimensions.write().await;
511            if dims_guard.is_none() {
512                *dims_guard = Some(dimensions);
513                needs_manifest_update = true;
514            } else if let Some(existing_dims) = *dims_guard {
515                if existing_dims != dimensions {
516                    return Err(VectraError::VectorValidation {
517                        message: format!("Vector dimension mismatch: expected {}, got {}", existing_dims, dimensions)
518                    });
519                }
520            }
521        }
522        
523        // Update manifest outside of the dimensions lock to avoid deadlock
524        if needs_manifest_update {
525            let mut manifest_guard = self.manifest.write().await;
526            if let Some(ref mut manifest) = *manifest_guard {
527                manifest.dimensions = Some(dimensions);
528                // Save manifest to disk only, we already have the in-memory version updated
529                self.save_manifest_to_disk(manifest).await?;
530            }
531        }
532        
533        // Get offset for vector storage
534        let start = std::time::Instant::now();
535        let vector_offset = self.get_next_vector_offset_and_mark_dirty(dimensions).await?;
536        let offset_time = start.elapsed();
537        
538        // Write vector to memory-mapped file
539        let start = std::time::Instant::now();
540        self.write_vector_to_file(&item.vector, vector_offset).await?;
541        let mmap_time = start.elapsed();
542        
543        // Store metadata and vector record in RocksDB
544        let db_guard = self.db.read().await;
545        if let Some(ref db) = *db_guard {
546            let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
547            let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
548            
549            let id_bytes = item.id.as_bytes();
550            
551            // Store metadata (without vector data) using JSON to handle serde_json::Value
552            let mut metadata_item = item.clone();
553            metadata_item.vector = Vec::new(); // Don't store vector in metadata
554            let metadata_bytes = serde_json::to_vec(&metadata_item)?;
555            // Use write options for better performance
556            let mut write_opts = rocksdb::WriteOptions::default();
557            write_opts.disable_wal(true); // Disable WAL for better performance
558            
559            let start = std::time::Instant::now();
560            db.put_cf_opt(metadata_cf, id_bytes, metadata_bytes, &write_opts)?;
561            
562            // Store vector record
563            let vector_record = VectorRecord {
564                id: item.id,
565                offset: vector_offset,
566                dimensions,
567                deleted: false,
568            };
569            let vector_record_bytes = bincode::serialize(&vector_record)?;
570            db.put_cf_opt(vector_index_cf, id_bytes, vector_record_bytes, &write_opts)?;
571            let db_time = start.elapsed();
572            
573            // Update manifest and log timing
574            let total_items = {
575                let mut manifest_guard = self.manifest.write().await;
576                if let Some(ref mut manifest) = *manifest_guard {
577                    manifest.total_items += 1;
578                    manifest.total_items
579                } else {
580                    return Err(VectraError::StorageError {
581                        message: "Manifest not initialized".to_string()
582                    });
583                }
584            };
585            
586            // Mark manifest dirty for batched saving
587            self.mark_manifest_dirty().await?;
588            
589            // Log timing periodically based on total_items
590            if total_items == 50 || total_items == 100 || total_items == 200 || total_items == 500 || total_items == 1000 {
591                let total_time = start_total.elapsed();
592                println!("    Storage: After {} items, insert took {} µs (offset: {} µs, mmap: {} µs, db: {} µs, rest: {} µs)",
593                         total_items,
594                         total_time.as_micros(),
595                         offset_time.as_micros(),
596                         mmap_time.as_micros(),
597                         db_time.as_micros(),
598                         total_time.as_micros() - offset_time.as_micros() - mmap_time.as_micros() - db_time.as_micros());
599            }
600        }
601        
602        Ok(())
603    }
604    
605    async fn insert_items(&mut self, items: &[VectorItem]) -> Result<()> {
606        if items.is_empty() {
607            return Ok(());
608        }
609        
610        // Ensure storage is initialized
611        if self.db.read().await.is_none() {
612            self.initialize_storage().await?;
613        }
614        
615        // Validate all items have same dimensions
616        let first_dimensions = items[0].vector.len();
617        for item in items {
618            if item.vector.len() != first_dimensions {
619                return Err(VectraError::VectorValidation {
620                    message: format!("All vectors must have same dimensions. Expected {}, got {}", first_dimensions, item.vector.len())
621                });
622            }
623        }
624        
625        // Set dimensions if this is the first batch
626        let mut needs_manifest_update = false;
627        {
628            let mut dims_guard = self.dimensions.write().await;
629            if dims_guard.is_none() {
630                *dims_guard = Some(first_dimensions);
631                needs_manifest_update = true;
632            } else if let Some(existing_dims) = *dims_guard {
633                if existing_dims != first_dimensions {
634                    return Err(VectraError::VectorValidation {
635                        message: format!("Vector dimension mismatch: expected {}, got {}", existing_dims, first_dimensions)
636                    });
637                }
638            }
639        }
640        
641        // Update manifest dimensions if needed
642        if needs_manifest_update {
643            let mut manifest_guard = self.manifest.write().await;
644            if let Some(ref mut manifest) = *manifest_guard {
645                manifest.dimensions = Some(first_dimensions);
646                self.save_manifest_to_disk(manifest).await?;
647            }
648        }
649        
650        // Calculate total space needed for all vectors
651        let record_size = VECTOR_HEADER_SIZE + (first_dimensions * 4);
652        let total_space_needed = items.len() * record_size;
653        
654        // Pre-check and grow file if needed before any writes
655        {
656            let manifest = self.manifest.read().await;
657            if let Some(ref m) = *manifest {
658                let current_offset = m.next_vector_offset;
659                let final_size = current_offset + total_space_needed as u64;
660                drop(manifest); // Release the lock before calling ensure_vector_file_capacity
661                
662                // Only print for large batches
663                if items.len() >= 2500 {
664                    println!("DEBUG: Batch size: {}, Current offset: {}, Final size needed: {}", 
665                             items.len(), current_offset, final_size);
666                }
667                
668                self.ensure_vector_file_capacity(final_size).await?;
669            }
670        }
671        
672        // Pre-allocate ALL vector offsets at once to avoid repeated lock acquisition
673        let record_size = VECTOR_HEADER_SIZE + (first_dimensions * 4);
674        let mut offsets = Vec::with_capacity(items.len());
675        {
676            let mut manifest_guard = self.manifest.write().await;
677            if let Some(ref mut manifest) = *manifest_guard {
678                let mut current_offset = manifest.next_vector_offset;
679                for _ in 0..items.len() {
680                    offsets.push(current_offset);
681                    current_offset += record_size as u64;
682                }
683                manifest.next_vector_offset = current_offset;
684                manifest.vector_file_size = current_offset;
685            } else {
686                return Err(VectraError::StorageError {
687                    message: "Manifest not initialized".to_string()
688                });
689            }
690        }
691        
692        // Now write vectors and prepare data without repeated lock acquisition
693        let mut prepared_data = Vec::with_capacity(items.len());
694        for (idx, item) in items.iter().enumerate() {
695            let vector_offset = offsets[idx];
696            self.write_vector_to_file(&item.vector, vector_offset).await?;
697            
698            // Prepare metadata (without vector data) using JSON
699            let mut metadata_item = item.clone();
700            metadata_item.vector = Vec::new();
701            let metadata_bytes = serde_json::to_vec(&metadata_item)?;
702            
703            // Prepare vector record
704            let vector_record = VectorRecord {
705                id: item.id,
706                offset: vector_offset,
707                dimensions: first_dimensions,
708                deleted: false,
709            };
710            let vector_record_bytes = bincode::serialize(&vector_record)?;
711            
712            prepared_data.push((item.id.as_bytes().to_vec(), metadata_bytes, vector_record_bytes));
713        }
714        
715        // Bulk write to database
716        let total_items_added = prepared_data.len();
717        {
718            let db_guard = self.db.read().await;
719            if let Some(ref db) = *db_guard {
720                let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
721                let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
722                
723                // Use RocksDB write batch for better performance
724                let mut batch = rocksdb::WriteBatch::default();
725                
726                for (id_bytes, metadata_bytes, vector_record_bytes) in prepared_data {
727                    batch.put_cf(metadata_cf, &id_bytes, metadata_bytes);
728                    batch.put_cf(vector_index_cf, &id_bytes, vector_record_bytes);
729                }
730                
731                // Execute batch write
732                db.write(batch)?;
733            }
734        }
735        
736        // Update manifest
737        {
738            let mut manifest_guard = self.manifest.write().await;
739            if let Some(ref mut manifest) = *manifest_guard {
740                manifest.total_items += total_items_added;
741            }
742        }
743        
744        // Mark manifest dirty for batched saving
745        self.mark_manifest_dirty().await?;
746        
747        Ok(())
748    }
749    
750    async fn update_item(&mut self, item: &VectorItem) -> Result<()> {
751        // For now, implement as delete + insert
752        self.delete_item(&item.id).await?;
753        self.insert_item(item).await?;
754        Ok(())
755    }
756    
757    async fn delete_item(&mut self, id: &Uuid) -> Result<()> {
758        let db_guard = self.db.read().await;
759        if let Some(ref db) = *db_guard {
760            let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
761            let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
762            
763            let id_bytes = id.as_bytes();
764            
765            // Mark vector record as deleted (we don't actually remove from file for now)
766            if let Some(vector_record_bytes) = db.get_cf(vector_index_cf, id_bytes)? {
767                let mut vector_record: VectorRecord = bincode::deserialize(&vector_record_bytes)?;
768                vector_record.deleted = true;
769                let updated_bytes = bincode::serialize(&vector_record)?;
770                db.put_cf(vector_index_cf, id_bytes, updated_bytes)?;
771            }
772            
773            // Remove metadata
774            db.delete_cf(metadata_cf, id_bytes)?;
775            
776            // Update manifest
777            let should_mark_dirty = {
778                let mut manifest_guard = self.manifest.write().await;
779                if let Some(ref mut manifest) = *manifest_guard {
780                    if manifest.total_items > 0 {
781                        manifest.total_items -= 1;
782                        true
783                    } else {
784                        false
785                    }
786                } else {
787                    false
788                }
789            };
790            
791            // Mark manifest dirty for batched saving if updated
792            if should_mark_dirty {
793                self.mark_manifest_dirty().await?;
794            }
795        }
796        
797        Ok(())
798    }
799    
800    async fn list_items(&self, options: Option<ListOptions>) -> Result<Vec<VectorItem>> {
801        // Ensure storage is initialized for read operations
802        if self.db.read().await.is_none() {
803            self.initialize_storage().await?;
804        }
805
806        // Collect all the metadata records first without holding DB references
807        let metadata_records = {
808            let db_guard = self.db.read().await;
809            if let Some(ref db) = *db_guard {
810                let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
811                let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
812                
813                let mut records = Vec::new();
814                let iter = db.iterator_cf(metadata_cf, rocksdb::IteratorMode::Start);
815                
816                for item in iter {
817                    let (key, value) = item?;
818                    
819                    // Check if item is not deleted
820                    if let Some(vector_record_bytes) = db.get_cf(vector_index_cf, &key)? {
821                        let vector_record: VectorRecord = bincode::deserialize(&vector_record_bytes)?;
822                        
823                        if !vector_record.deleted {
824                            let metadata_item: VectorItem = serde_json::from_slice(&value)?;
825                            records.push((metadata_item, vector_record));
826                            
827                            // Apply limit if specified
828                            if let Some(ref opts) = options {
829                                if let Some(limit) = opts.limit {
830                                    if records.len() >= limit {
831                                        break;
832                                    }
833                                }
834                            }
835                        }
836                    }
837                }
838                
839                records
840            } else {
841                Vec::new()
842            }
843        };
844        
845        // Now load vectors without holding DB guard
846        let mut items = Vec::new();
847        for (mut metadata_item, vector_record) in metadata_records {
848            // Load vector from file
849            metadata_item.vector = self.read_vector_from_file(vector_record.offset, vector_record.dimensions).await?;
850            items.push(metadata_item);
851        }
852        
853        Ok(items)
854    }
855    
856    async fn query_items(&self, query: &Query) -> Result<Vec<QueryResult>> {
857        if let Some(ref query_vector) = query.vector {
858            // For now, implement basic brute-force similarity search
859            // In a real implementation, this would use HNSW index
860            let all_items = self.list_items(None).await?;
861            let mut results = Vec::new();
862            
863            for item in all_items {
864                if item.vector.len() == query_vector.len() {
865                    let similarity = VectorOps::cosine_similarity(query_vector, &item.vector);
866                    results.push(QueryResult {
867                        item,
868                        score: similarity,
869                    });
870                }
871            }
872            
873            // Sort by similarity (descending)
874            results.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
875            
876            // Apply limit
877            results.truncate(query.top_k);
878            
879            Ok(results)
880        } else {
881            Ok(Vec::new())
882        }
883    }
884    
885    async fn begin_transaction(&mut self) -> Result<()> {
886        // For simplicity, we'll use RocksDB's default transaction behavior
887        // In a full implementation, we'd use RocksDB transactions
888        Ok(())
889    }
890    
891    async fn commit_transaction(&mut self) -> Result<()> {
892        // Flush manifest to disk
893        self.flush_manifest_if_dirty().await?;
894
895        // Flush any pending writes
896        let db_guard = self.db.read().await;
897        if let Some(ref db) = *db_guard {
898            db.flush()?;
899        }
900
901        if let Some(ref mut mmap_guard) = *self.vector_mmap.write().await {
902            mmap_guard.flush()?;
903        }
904
905        Ok(())
906    }
907    
908    async fn rollback_transaction(&mut self) -> Result<()> {
909        // In a full implementation, we'd rollback pending changes
910        // For now, just return success
911        Ok(())
912    }
913    
914    async fn delete_index(&mut self) -> Result<()> {
915        // Close database and memory map first
916        *self.db.write().await = None;
917        *self.vector_file.write().await = None;
918        *self.vector_mmap.write().await = None;
919        *self.manifest.write().await = None;
920        
921        // Remove all files in the index directory
922        if self.path.exists() {
923            fs::remove_dir_all(&self.path).await?;
924        }
925        Ok(())
926    }
927    
928    async fn get_stats(&self) -> Result<IndexStats> {
929        if let Some(manifest) = self.load_manifest().await? {
930            let size = if self.path.exists() {
931                // Calculate directory size
932                let mut total_size = 0u64;
933                let mut entries = fs::read_dir(&self.path).await?;
934                while let Some(entry) = entries.next_entry().await? {
935                    if let Ok(metadata) = entry.metadata().await {
936                        total_size += metadata.len();
937                    }
938                }
939                total_size
940            } else {
941                0
942            };
943            
944            Ok(IndexStats {
945                items: manifest.total_items,
946                size,
947                dimensions: manifest.dimensions,
948                distance_metric: manifest.distance_metric,
949            })
950        } else {
951            Ok(IndexStats {
952                items: 0,
953                size: 0,
954                dimensions: None,
955                distance_metric: DistanceMetric::Cosine,
956            })
957        }
958    }
959}
960
961#[cfg(test)]
962mod tests {
963    use super::*;
964    use vectrust_core::StorageBackend;
965    use tempfile::TempDir;
966    
967    #[tokio::test]
968    async fn test_optimized_storage_creation() {
969        let temp_dir = TempDir::new().unwrap();
970        let mut storage = OptimizedStorage::new(temp_dir.path()).unwrap();
971        
972        assert!(!storage.exists().await);
973        
974        let config = CreateIndexConfig::default();
975        storage.create_index(&config).await.unwrap();
976        
977        assert!(storage.exists().await);
978    }
979    
980    #[tokio::test]
981    async fn test_optimized_storage_insert_and_get() {
982        let temp_dir = TempDir::new().unwrap();
983        let mut storage = OptimizedStorage::new(temp_dir.path()).unwrap();
984        
985        let config = CreateIndexConfig::default();
986        storage.create_index(&config).await.unwrap();
987        
988        let item = VectorItem {
989            id: Uuid::new_v4(),
990            vector: vec![1.0, 0.0, 0.0],
991            metadata: serde_json::json!({"test": "data"}),
992            ..Default::default()
993        };
994        
995        storage.insert_item(&item).await.unwrap();
996        
997        let retrieved = storage.get_item(&item.id).await.unwrap();
998        assert!(retrieved.is_some());
999        
1000        let retrieved_item = retrieved.unwrap();
1001        assert_eq!(retrieved_item.id, item.id);
1002        assert_eq!(retrieved_item.vector, item.vector);
1003    }
1004    
1005    #[tokio::test]
1006    async fn test_optimized_storage_query() {
1007        let temp_dir = TempDir::new().unwrap();
1008        let mut storage = OptimizedStorage::new(temp_dir.path()).unwrap();
1009        
1010        let config = CreateIndexConfig::default();
1011        storage.create_index(&config).await.unwrap();
1012        
1013        let item1 = VectorItem {
1014            id: Uuid::new_v4(),
1015            vector: vec![1.0, 0.0, 0.0],
1016            metadata: serde_json::json!({"name": "item1"}),
1017            ..Default::default()
1018        };
1019        
1020        let item2 = VectorItem {
1021            id: Uuid::new_v4(),
1022            vector: vec![0.0, 1.0, 0.0],
1023            metadata: serde_json::json!({"name": "item2"}),
1024            ..Default::default()
1025        };
1026        
1027        storage.insert_item(&item1).await.unwrap();
1028        storage.insert_item(&item2).await.unwrap();
1029        
1030        let query = Query {
1031            vector: Some(vec![1.0, 0.1, 0.0]),
1032            text: None,
1033            top_k: 2,
1034            filter: None,
1035        };
1036        
1037        let results = storage.query_items(&query).await.unwrap();
1038        assert_eq!(results.len(), 2);
1039        
1040        // First result should be more similar to item1
1041        assert_eq!(results[0].item.id, item1.id);
1042        assert!(results[0].score > results[1].score);
1043    }
1044}