Skip to main content

vectrust_storage/
optimized.rs

1use async_trait::async_trait;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use tokio::fs;
5use tokio::sync::RwLock;
6use vectrust_core::*;
7use uuid::Uuid;
8use serde::{Serialize, Deserialize};
9use rocksdb::{DB, Options};
10use memmap2::{MmapMut, MmapOptions};
11use std::fs::OpenOptions;
12use std::io::{Seek, SeekFrom, Write};
13use bincode;
14
15/// Optimized storage format (v2) with better performance
16pub struct OptimizedStorage {
17    path: PathBuf,
18    db: Arc<RwLock<Option<DB>>>,
19    vector_file: Arc<RwLock<Option<std::fs::File>>>,
20    vector_mmap: Arc<RwLock<Option<MmapMut>>>,
21    manifest: Arc<RwLock<Option<Manifest>>>,
22    dimensions: Arc<RwLock<Option<usize>>>,
23    // Performance optimization: batch manifest updates
24    manifest_dirty: Arc<RwLock<bool>>,
25    operations_since_save: Arc<RwLock<u32>>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct Manifest {
30    pub version: u32,
31    pub format: String,
32    pub created_at: chrono::DateTime<chrono::Utc>,
33    pub dimensions: Option<usize>,
34    pub distance_metric: DistanceMetric,
35    pub total_items: usize,
36    pub vector_file_size: u64,
37    pub next_vector_offset: u64,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41struct VectorRecord {
42    pub id: Uuid,
43    pub offset: u64,
44    pub dimensions: usize,
45    pub deleted: bool,
46}
47
48const METADATA_CF: &str = "metadata";
49const VECTOR_INDEX_CF: &str = "vector_index";
50const VECTOR_HEADER_SIZE: usize = 8; // u64 for dimensions count
51
52const MANIFEST_SAVE_INTERVAL: u32 = 100; // Save manifest every N operations
53
54impl OptimizedStorage {
55    pub fn new(path: &Path) -> Result<Self> {
56        Ok(Self {
57            path: path.to_path_buf(),
58            db: Arc::new(RwLock::new(None)),
59            vector_file: Arc::new(RwLock::new(None)),
60            vector_mmap: Arc::new(RwLock::new(None)),
61            manifest: Arc::new(RwLock::new(None)),
62            dimensions: Arc::new(RwLock::new(None)),
63            manifest_dirty: Arc::new(RwLock::new(false)),
64            operations_since_save: Arc::new(RwLock::new(0)),
65        })
66    }
67    
68    async fn initialize_storage(&self) -> Result<()> {
69        // Create directory if it doesn't exist
70        if !self.path.exists() {
71            std::fs::create_dir_all(&self.path)?;
72        }
73        
74        // Open RocksDB with optimized settings for vector workloads
75        let db_path = self.path.join("metadata");
76        let mut db_opts = Options::default();
77        db_opts.create_if_missing(true);
78        db_opts.create_missing_column_families(true);
79        
80        // Performance optimizations for vector operations
81        db_opts.set_max_write_buffer_number(4);
82        db_opts.set_write_buffer_size(64 * 1024 * 1024); // 64MB
83        db_opts.set_target_file_size_base(64 * 1024 * 1024); // 64MB
84        db_opts.set_level_compaction_dynamic_level_bytes(true);
85        db_opts.set_max_bytes_for_level_base(256 * 1024 * 1024); // 256MB
86        db_opts.set_max_background_jobs(4);
87        db_opts.set_bytes_per_sync(64 * 1024 * 1024); // 64MB - sync less frequently
88        
89        // Note: We're not disabling auto-compactions as it can cause issues
90        
91        let cf_names = vec![METADATA_CF, VECTOR_INDEX_CF];
92        let db = DB::open_cf(&db_opts, db_path, cf_names)?;
93        
94        *self.db.write().await = Some(db);
95        
96        // Load or create manifest
97        if let Some(manifest) = self.load_manifest().await? {
98            *self.manifest.write().await = Some(manifest.clone());
99            *self.dimensions.write().await = manifest.dimensions;
100            
101            // Open existing vector file
102            let vector_path = self.path.join("vectors.dat");
103            if vector_path.exists() {
104                let file = OpenOptions::new()
105                    .read(true)
106                    .write(true)
107                    .open(&vector_path)?;
108                
109                let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
110                
111                *self.vector_file.write().await = Some(file);
112                *self.vector_mmap.write().await = Some(mmap);
113            }
114        }
115        
116        Ok(())
117    }
118    
119    async fn create_vector_file(&self, initial_size: u64) -> Result<()> {
120        let vector_path = self.path.join("vectors.dat");
121        
122        let mut file = OpenOptions::new()
123            .read(true)
124            .write(true)
125            .create(true)
126            .truncate(true)
127            .open(&vector_path)?;
128            
129        // Pre-allocate space
130        file.seek(SeekFrom::Start(initial_size - 1))?;
131        file.write_all(&[0])?;
132        file.flush()?;
133        
134        let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
135        
136        *self.vector_file.write().await = Some(file);
137        *self.vector_mmap.write().await = Some(mmap);
138        
139        Ok(())
140    }
141    
142    async fn write_vector_to_file(&self, vector: &[f32], offset: u64) -> Result<()> {
143        let mut mmap_guard = self.vector_mmap.write().await;
144        if let Some(ref mut mmap) = *mmap_guard {
145            let start = offset as usize;
146            let dimensions = vector.len();
147            
148            // Check bounds
149            let end_pos = start + VECTOR_HEADER_SIZE + (dimensions * 4);
150            let mmap_len = mmap.len();
151            if end_pos > mmap_len {
152                return Err(VectraError::StorageError {
153                    message: format!("Memory map too small: need {} bytes, have {} bytes", 
154                                   end_pos, mmap_len)
155                });
156            }
157            
158            // Write dimensions count first (8 bytes)
159            let dim_bytes = (dimensions as u64).to_le_bytes();
160            mmap[start..start + 8].copy_from_slice(&dim_bytes);
161            
162            // Write vector data (4 bytes per f32)
163            let vector_start = start + VECTOR_HEADER_SIZE;
164            for (i, &value) in vector.iter().enumerate() {
165                let value_bytes = value.to_le_bytes();
166                let pos = vector_start + (i * 4);
167                mmap[pos..pos + 4].copy_from_slice(&value_bytes);
168            }
169            
170            // Don't flush on every write - let OS handle it for better performance
171            // mmap.flush()?;
172        }
173        
174        Ok(())
175    }
176    
177    async fn read_vector_from_file(&self, offset: u64, expected_dims: usize) -> Result<Vec<f32>> {
178        let mmap_guard = self.vector_mmap.read().await;
179        if let Some(ref mmap) = *mmap_guard {
180            let start = offset as usize;
181            
182            // Read dimensions count
183            let mut dim_bytes = [0u8; 8];
184            dim_bytes.copy_from_slice(&mmap[start..start + 8]);
185            let dimensions = u64::from_le_bytes(dim_bytes) as usize;
186            
187            if dimensions != expected_dims {
188                return Err(VectraError::VectorValidation {
189                    message: format!("Dimension mismatch: expected {}, got {}", expected_dims, dimensions)
190                });
191            }
192            
193            // Read vector data
194            let mut vector = Vec::with_capacity(dimensions);
195            let vector_start = start + VECTOR_HEADER_SIZE;
196            
197            for i in 0..dimensions {
198                let pos = vector_start + (i * 4);
199                let mut value_bytes = [0u8; 4];
200                value_bytes.copy_from_slice(&mmap[pos..pos + 4]);
201                vector.push(f32::from_le_bytes(value_bytes));
202            }
203            
204            Ok(vector)
205        } else {
206            Err(VectraError::StorageError { 
207                message: "Vector file not initialized".to_string() 
208            })
209        }
210    }
211    
212    fn manifest_path(&self) -> PathBuf {
213        self.path.join("manifest.json")
214    }
215    
216    async fn load_manifest(&self) -> Result<Option<Manifest>> {
217        let manifest_path = self.manifest_path();
218        
219        if !manifest_path.exists() {
220            return Ok(None);
221        }
222        
223        let content = fs::read_to_string(manifest_path).await?;
224        let manifest: Manifest = serde_json::from_str(&content)?;
225        Ok(Some(manifest))
226    }
227    
228    async fn save_manifest_to_disk(&self, manifest: &Manifest) -> Result<()> {
229        let manifest_path = self.manifest_path();
230        
231        // Debug: Track manifest saves
232        static SAVE_COUNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
233        let count = SAVE_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
234        if count % 10 == 0 {
235            println!("DEBUG: Manifest save #{}, total_items: {}", count + 1, manifest.total_items);
236        }
237        
238        // Ensure directory exists
239        if let Some(parent) = manifest_path.parent() {
240            fs::create_dir_all(parent).await?;
241        }
242        
243        let start = std::time::Instant::now();
244        let content = serde_json::to_string_pretty(manifest)?;
245        let json_time = start.elapsed();
246        
247        let start = std::time::Instant::now();
248        fs::write(manifest_path, content).await?;
249        let write_time = start.elapsed();
250        
251        if count % 10 == 0 {
252            println!("  JSON serialize: {} µs, Disk write: {} µs", 
253                     json_time.as_micros(), write_time.as_micros());
254        }
255        
256        Ok(())
257    }
258    
259    async fn save_manifest(&self, manifest: &Manifest) -> Result<()> {
260        self.save_manifest_to_disk(manifest).await?;
261        
262        // Update in-memory manifest
263        *self.manifest.write().await = Some(manifest.clone());
264        
265        Ok(())
266    }
267    
268    /// Mark manifest as dirty and potentially save it based on batching interval
269    async fn mark_manifest_dirty(&self) -> Result<()> {
270        *self.manifest_dirty.write().await = true;
271        
272        let mut ops_count = self.operations_since_save.write().await;
273        *ops_count += 1;
274        
275        // Save manifest every N operations for crash safety vs performance balance
276        if *ops_count >= MANIFEST_SAVE_INTERVAL {
277            drop(ops_count); // Release lock before calling save
278            self.flush_manifest_if_dirty().await?;
279        }
280        
281        Ok(())
282    }
283    
284    /// Save manifest to disk if it's been modified, reset dirty flag
285    async fn flush_manifest_if_dirty(&self) -> Result<()> {
286        let is_dirty = {
287            let dirty = self.manifest_dirty.read().await;
288            *dirty
289        };
290        
291        if is_dirty {
292            let manifest = {
293                let manifest_guard = self.manifest.read().await;
294                manifest_guard.clone()
295            };
296            
297            if let Some(manifest) = manifest {
298                self.save_manifest_to_disk(&manifest).await?;
299                *self.manifest_dirty.write().await = false;
300                *self.operations_since_save.write().await = 0;
301            }
302        }
303        
304        Ok(())
305    }
306    
307    async fn ensure_vector_file_capacity(&self, needed_size: u64) -> Result<()> {
308        let vector_path = self.path.join("vectors.dat");
309        
310        // Check if file exists first
311        if !vector_path.exists() {
312            return Err(VectraError::StorageError {
313                message: "Vector file does not exist".to_string()
314            });
315        }
316        
317        // Get current file size
318        let metadata = tokio::fs::metadata(&vector_path).await?;
319        let current_size = metadata.len();
320        
321        if needed_size > current_size {
322            // Calculate new size with some headroom (grow by at least 50% or to needed size + 10MB)
323            let new_size = std::cmp::max(
324                (current_size as f64 * 1.5) as u64,
325                needed_size + (10 * 1024 * 1024) // Add 10MB buffer
326            );
327            
328            println!("    📈 Growing vector file from {} MB to {} MB", 
329                     current_size / 1024 / 1024, 
330                     new_size / 1024 / 1024);
331            
332            // We need to recreate the memory map with a larger file
333            // First, flush and drop the current mmap
334            {
335                let mut mmap_guard = self.vector_mmap.write().await;
336                if let Some(ref mut mmap) = *mmap_guard {
337                    mmap.flush()?;
338                }
339                *mmap_guard = None; // Drop the old mmap
340            }
341            
342            // Also drop the file handle
343            {
344                let mut file_guard = self.vector_file.write().await;
345                *file_guard = None;
346            }
347            
348            // Resize the file
349            let file = OpenOptions::new()
350                .read(true)
351                .write(true)
352                .open(&vector_path)?;
353            
354            file.set_len(new_size)?;
355            file.sync_all()?; // Ensure the resize is flushed
356            
357            // Create new memory map
358            let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
359            
360            // Update the handles
361            *self.vector_file.write().await = Some(file);
362            *self.vector_mmap.write().await = Some(mmap);
363            
364            println!("    ✅ Vector file resized successfully");
365        }
366        
367        Ok(())
368    }
369    
370    async fn get_next_vector_offset(&self, vector_size: usize) -> Result<u64> {
371        let current_offset = {
372            let mut manifest_guard = self.manifest.write().await;
373            if let Some(ref mut manifest) = *manifest_guard {
374                let current_offset = manifest.next_vector_offset;
375                let record_size = VECTOR_HEADER_SIZE + (vector_size * 4); // header + 4 bytes per f32
376                manifest.next_vector_offset += record_size as u64;
377                manifest.vector_file_size = manifest.next_vector_offset;
378                current_offset
379            } else {
380                return Err(VectraError::StorageError {
381                    message: "Manifest not initialized".to_string()
382                });
383            }
384        };
385        
386        // Don't mark dirty here - let the caller decide when to mark dirty
387        Ok(current_offset)
388    }
389    
390    async fn get_next_vector_offset_and_mark_dirty(&self, vector_size: usize) -> Result<u64> {
391        let offset = self.get_next_vector_offset(vector_size).await?;
392        self.mark_manifest_dirty().await?;
393        Ok(offset)
394    }
395    
396    /// Ensure all pending changes are flushed to disk
397    pub async fn flush(&self) -> Result<()> {
398        // Flush manifest
399        self.flush_manifest_if_dirty().await?;
400        
401        // Flush memory-mapped file if it exists
402        if let Some(ref mmap) = *self.vector_mmap.read().await {
403            mmap.flush()?;
404        }
405        
406        // Flush RocksDB
407        if let Some(ref db) = *self.db.read().await {
408            db.flush()?;
409        }
410        
411        Ok(())
412    }
413}
414
415#[async_trait]
416impl StorageBackend for OptimizedStorage {
417    async fn exists(&self) -> bool {
418        self.manifest_path().exists()
419    }
420    
421    async fn create_index(&mut self, config: &CreateIndexConfig) -> Result<()> {
422        let manifest_path = self.manifest_path();
423        
424        if manifest_path.exists() && !config.delete_if_exists {
425            return Err(VectraError::IndexAlreadyExists { 
426                path: manifest_path.to_string_lossy().to_string() 
427            });
428        }
429        
430        // Clean up existing files if delete_if_exists is true
431        if config.delete_if_exists && self.path.exists() {
432            fs::remove_dir_all(&self.path).await.ok();
433        }
434        
435        let manifest = Manifest {
436            version: 2,
437            format: "optimized".to_string(),
438            created_at: chrono::Utc::now(),
439            dimensions: None,
440            distance_metric: config.distance_metric.clone(),
441            total_items: 0,
442            vector_file_size: 0,
443            next_vector_offset: 0,
444        };
445        
446        self.save_manifest(&manifest).await?;
447        
448        // Initialize storage components
449        self.initialize_storage().await?;
450        
451        // Make sure manifest is persisted for create operations
452        self.flush_manifest_if_dirty().await?;
453        
454        // Create initial vector file (10MB initial size - reasonable for ~6.5K vectors with 384 dims)
455        // File will grow dynamically as needed
456        self.create_vector_file(10 * 1024 * 1024).await?;
457        
458        Ok(())
459    }
460    
461    async fn get_item(&self, id: &Uuid) -> Result<Option<VectorItem>> {
462        // Ensure storage is initialized for read operations
463        if self.db.read().await.is_none() {
464            self.initialize_storage().await?;
465        }
466
467        let db_guard = self.db.read().await;
468        if let Some(ref db) = *db_guard {
469            let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
470            let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
471            
472            let id_bytes = id.as_bytes();
473            
474            // Get metadata
475            if let Some(metadata_bytes) = db.get_cf(metadata_cf, id_bytes)? {
476                let mut item: VectorItem = serde_json::from_slice(&metadata_bytes)?;
477                
478                // Get vector record
479                if let Some(vector_record_bytes) = db.get_cf(vector_index_cf, id_bytes)? {
480                    let vector_record: VectorRecord = bincode::deserialize(&vector_record_bytes)?;
481                    
482                    if !vector_record.deleted {
483                        // Read vector from memory-mapped file
484                        item.vector = self.read_vector_from_file(vector_record.offset, vector_record.dimensions).await?;
485                        return Ok(Some(item));
486                    }
487                }
488            }
489        }
490        
491        Ok(None)
492    }
493    
494    async fn insert_item(&mut self, item: &VectorItem) -> Result<()> {
495        // Ensure storage is initialized
496        if self.db.read().await.is_none() {
497            self.initialize_storage().await?;
498        }
499        
500        // Add timing for debugging
501        let start_total = std::time::Instant::now();
502        
503        // Debug: count inserts
504        static DEBUG_COUNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
505        let debug_count = DEBUG_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
506        if debug_count % 50 == 0 {
507            println!("DEBUG: insert_item called {} times", debug_count + 1);
508        }
509        
510        let dimensions = item.vector.len();
511        
512        // Set dimensions if this is the first item
513        let mut needs_manifest_update = false;
514        {
515            let mut dims_guard = self.dimensions.write().await;
516            if dims_guard.is_none() {
517                *dims_guard = Some(dimensions);
518                needs_manifest_update = true;
519            } else if let Some(existing_dims) = *dims_guard {
520                if existing_dims != dimensions {
521                    return Err(VectraError::VectorValidation {
522                        message: format!("Vector dimension mismatch: expected {}, got {}", existing_dims, dimensions)
523                    });
524                }
525            }
526        }
527        
528        // Update manifest outside of the dimensions lock to avoid deadlock
529        if needs_manifest_update {
530            let mut manifest_guard = self.manifest.write().await;
531            if let Some(ref mut manifest) = *manifest_guard {
532                manifest.dimensions = Some(dimensions);
533                // Save manifest to disk only, we already have the in-memory version updated
534                self.save_manifest_to_disk(manifest).await?;
535            }
536        }
537        
538        // Get offset for vector storage
539        let start = std::time::Instant::now();
540        let vector_offset = self.get_next_vector_offset_and_mark_dirty(dimensions).await?;
541        let offset_time = start.elapsed();
542        
543        // Write vector to memory-mapped file
544        let start = std::time::Instant::now();
545        self.write_vector_to_file(&item.vector, vector_offset).await?;
546        let mmap_time = start.elapsed();
547        
548        // Store metadata and vector record in RocksDB
549        let db_guard = self.db.read().await;
550        if let Some(ref db) = *db_guard {
551            let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
552            let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
553            
554            let id_bytes = item.id.as_bytes();
555            
556            // Store metadata (without vector data) using JSON to handle serde_json::Value
557            let mut metadata_item = item.clone();
558            metadata_item.vector = Vec::new(); // Don't store vector in metadata
559            let metadata_bytes = serde_json::to_vec(&metadata_item)?;
560            // Use write options for better performance
561            let mut write_opts = rocksdb::WriteOptions::default();
562            write_opts.disable_wal(true); // Disable WAL for better performance
563            
564            let start = std::time::Instant::now();
565            db.put_cf_opt(metadata_cf, id_bytes, metadata_bytes, &write_opts)?;
566            
567            // Store vector record
568            let vector_record = VectorRecord {
569                id: item.id,
570                offset: vector_offset,
571                dimensions,
572                deleted: false,
573            };
574            let vector_record_bytes = bincode::serialize(&vector_record)?;
575            db.put_cf_opt(vector_index_cf, id_bytes, vector_record_bytes, &write_opts)?;
576            let db_time = start.elapsed();
577            
578            // Update manifest and log timing
579            let total_items = {
580                let mut manifest_guard = self.manifest.write().await;
581                if let Some(ref mut manifest) = *manifest_guard {
582                    manifest.total_items += 1;
583                    manifest.total_items
584                } else {
585                    return Err(VectraError::StorageError {
586                        message: "Manifest not initialized".to_string()
587                    });
588                }
589            };
590            
591            // Mark manifest dirty for batched saving
592            self.mark_manifest_dirty().await?;
593            
594            // Log timing periodically based on total_items
595            if total_items == 50 || total_items == 100 || total_items == 200 || total_items == 500 || total_items == 1000 {
596                let total_time = start_total.elapsed();
597                println!("    Storage: After {} items, insert took {} µs (offset: {} µs, mmap: {} µs, db: {} µs, rest: {} µs)",
598                         total_items,
599                         total_time.as_micros(),
600                         offset_time.as_micros(),
601                         mmap_time.as_micros(),
602                         db_time.as_micros(),
603                         total_time.as_micros() - offset_time.as_micros() - mmap_time.as_micros() - db_time.as_micros());
604            }
605        }
606        
607        Ok(())
608    }
609    
610    async fn insert_items(&mut self, items: &[VectorItem]) -> Result<()> {
611        if items.is_empty() {
612            return Ok(());
613        }
614        
615        // Ensure storage is initialized
616        if self.db.read().await.is_none() {
617            self.initialize_storage().await?;
618        }
619        
620        // Validate all items have same dimensions
621        let first_dimensions = items[0].vector.len();
622        for item in items {
623            if item.vector.len() != first_dimensions {
624                return Err(VectraError::VectorValidation {
625                    message: format!("All vectors must have same dimensions. Expected {}, got {}", first_dimensions, item.vector.len())
626                });
627            }
628        }
629        
630        // Set dimensions if this is the first batch
631        let mut needs_manifest_update = false;
632        {
633            let mut dims_guard = self.dimensions.write().await;
634            if dims_guard.is_none() {
635                *dims_guard = Some(first_dimensions);
636                needs_manifest_update = true;
637            } else if let Some(existing_dims) = *dims_guard {
638                if existing_dims != first_dimensions {
639                    return Err(VectraError::VectorValidation {
640                        message: format!("Vector dimension mismatch: expected {}, got {}", existing_dims, first_dimensions)
641                    });
642                }
643            }
644        }
645        
646        // Update manifest dimensions if needed
647        if needs_manifest_update {
648            let mut manifest_guard = self.manifest.write().await;
649            if let Some(ref mut manifest) = *manifest_guard {
650                manifest.dimensions = Some(first_dimensions);
651                self.save_manifest_to_disk(manifest).await?;
652            }
653        }
654        
655        // Calculate total space needed for all vectors
656        let record_size = VECTOR_HEADER_SIZE + (first_dimensions * 4);
657        let total_space_needed = items.len() * record_size;
658        
659        // Pre-check and grow file if needed before any writes
660        {
661            let manifest = self.manifest.read().await;
662            if let Some(ref m) = *manifest {
663                let current_offset = m.next_vector_offset;
664                let final_size = current_offset + total_space_needed as u64;
665                drop(manifest); // Release the lock before calling ensure_vector_file_capacity
666                
667                // Only print for large batches
668                if items.len() >= 2500 {
669                    println!("DEBUG: Batch size: {}, Current offset: {}, Final size needed: {}", 
670                             items.len(), current_offset, final_size);
671                }
672                
673                self.ensure_vector_file_capacity(final_size).await?;
674            }
675        }
676        
677        // Pre-allocate ALL vector offsets at once to avoid repeated lock acquisition
678        let record_size = VECTOR_HEADER_SIZE + (first_dimensions * 4);
679        let mut offsets = Vec::with_capacity(items.len());
680        {
681            let mut manifest_guard = self.manifest.write().await;
682            if let Some(ref mut manifest) = *manifest_guard {
683                let mut current_offset = manifest.next_vector_offset;
684                for _ in 0..items.len() {
685                    offsets.push(current_offset);
686                    current_offset += record_size as u64;
687                }
688                manifest.next_vector_offset = current_offset;
689                manifest.vector_file_size = current_offset;
690            } else {
691                return Err(VectraError::StorageError {
692                    message: "Manifest not initialized".to_string()
693                });
694            }
695        }
696        
697        // Now write vectors and prepare data without repeated lock acquisition
698        let mut prepared_data = Vec::with_capacity(items.len());
699        for (idx, item) in items.iter().enumerate() {
700            let vector_offset = offsets[idx];
701            self.write_vector_to_file(&item.vector, vector_offset).await?;
702            
703            // Prepare metadata (without vector data) using JSON
704            let mut metadata_item = item.clone();
705            metadata_item.vector = Vec::new();
706            let metadata_bytes = serde_json::to_vec(&metadata_item)?;
707            
708            // Prepare vector record
709            let vector_record = VectorRecord {
710                id: item.id,
711                offset: vector_offset,
712                dimensions: first_dimensions,
713                deleted: false,
714            };
715            let vector_record_bytes = bincode::serialize(&vector_record)?;
716            
717            prepared_data.push((item.id.as_bytes().to_vec(), metadata_bytes, vector_record_bytes));
718        }
719        
720        // Bulk write to database
721        let total_items_added = prepared_data.len();
722        {
723            let db_guard = self.db.read().await;
724            if let Some(ref db) = *db_guard {
725                let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
726                let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
727                
728                // Use RocksDB write batch for better performance
729                let mut batch = rocksdb::WriteBatch::default();
730                
731                for (id_bytes, metadata_bytes, vector_record_bytes) in prepared_data {
732                    batch.put_cf(metadata_cf, &id_bytes, metadata_bytes);
733                    batch.put_cf(vector_index_cf, &id_bytes, vector_record_bytes);
734                }
735                
736                // Execute batch write
737                db.write(batch)?;
738            }
739        }
740        
741        // Update manifest
742        {
743            let mut manifest_guard = self.manifest.write().await;
744            if let Some(ref mut manifest) = *manifest_guard {
745                manifest.total_items += total_items_added;
746            }
747        }
748        
749        // Mark manifest dirty for batched saving
750        self.mark_manifest_dirty().await?;
751        
752        Ok(())
753    }
754    
755    async fn update_item(&mut self, item: &VectorItem) -> Result<()> {
756        // For now, implement as delete + insert
757        self.delete_item(&item.id).await?;
758        self.insert_item(item).await?;
759        Ok(())
760    }
761    
762    async fn delete_item(&mut self, id: &Uuid) -> Result<()> {
763        let db_guard = self.db.read().await;
764        if let Some(ref db) = *db_guard {
765            let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
766            let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
767            
768            let id_bytes = id.as_bytes();
769            
770            // Mark vector record as deleted (we don't actually remove from file for now)
771            if let Some(vector_record_bytes) = db.get_cf(vector_index_cf, id_bytes)? {
772                let mut vector_record: VectorRecord = bincode::deserialize(&vector_record_bytes)?;
773                vector_record.deleted = true;
774                let updated_bytes = bincode::serialize(&vector_record)?;
775                db.put_cf(vector_index_cf, id_bytes, updated_bytes)?;
776            }
777            
778            // Remove metadata
779            db.delete_cf(metadata_cf, id_bytes)?;
780            
781            // Update manifest
782            let should_mark_dirty = {
783                let mut manifest_guard = self.manifest.write().await;
784                if let Some(ref mut manifest) = *manifest_guard {
785                    if manifest.total_items > 0 {
786                        manifest.total_items -= 1;
787                        true
788                    } else {
789                        false
790                    }
791                } else {
792                    false
793                }
794            };
795            
796            // Mark manifest dirty for batched saving if updated
797            if should_mark_dirty {
798                self.mark_manifest_dirty().await?;
799            }
800        }
801        
802        Ok(())
803    }
804    
805    async fn list_items(&self, options: Option<ListOptions>) -> Result<Vec<VectorItem>> {
806        // Ensure storage is initialized for read operations
807        if self.db.read().await.is_none() {
808            self.initialize_storage().await?;
809        }
810
811        // Collect all the metadata records first without holding DB references
812        let metadata_records = {
813            let db_guard = self.db.read().await;
814            if let Some(ref db) = *db_guard {
815                let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
816                let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
817                
818                let mut records = Vec::new();
819                let iter = db.iterator_cf(metadata_cf, rocksdb::IteratorMode::Start);
820                
821                for item in iter {
822                    let (key, value) = item?;
823                    
824                    // Check if item is not deleted
825                    if let Some(vector_record_bytes) = db.get_cf(vector_index_cf, &key)? {
826                        let vector_record: VectorRecord = bincode::deserialize(&vector_record_bytes)?;
827                        
828                        if !vector_record.deleted {
829                            let metadata_item: VectorItem = serde_json::from_slice(&value)?;
830                            records.push((metadata_item, vector_record));
831                            
832                            // Apply limit if specified
833                            if let Some(ref opts) = options {
834                                if let Some(limit) = opts.limit {
835                                    if records.len() >= limit {
836                                        break;
837                                    }
838                                }
839                            }
840                        }
841                    }
842                }
843                
844                records
845            } else {
846                Vec::new()
847            }
848        };
849        
850        // Now load vectors without holding DB guard
851        let mut items = Vec::new();
852        for (mut metadata_item, vector_record) in metadata_records {
853            // Load vector from file
854            metadata_item.vector = self.read_vector_from_file(vector_record.offset, vector_record.dimensions).await?;
855            items.push(metadata_item);
856        }
857        
858        Ok(items)
859    }
860    
861    async fn query_items(&self, query: &Query) -> Result<Vec<QueryResult>> {
862        if let Some(ref query_vector) = query.vector {
863            // For now, implement basic brute-force similarity search
864            // In a real implementation, this would use HNSW index
865            let all_items = self.list_items(None).await?;
866            let mut results = Vec::new();
867            
868            for item in all_items {
869                if item.vector.len() == query_vector.len() {
870                    let similarity = VectorOps::cosine_similarity(query_vector, &item.vector);
871                    results.push(QueryResult {
872                        item,
873                        score: similarity,
874                    });
875                }
876            }
877            
878            // Sort by similarity (descending)
879            results.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
880            
881            // Apply limit
882            results.truncate(query.top_k);
883            
884            Ok(results)
885        } else {
886            Ok(Vec::new())
887        }
888    }
889    
890    async fn begin_transaction(&mut self) -> Result<()> {
891        // For simplicity, we'll use RocksDB's default transaction behavior
892        // In a full implementation, we'd use RocksDB transactions
893        Ok(())
894    }
895    
896    async fn commit_transaction(&mut self) -> Result<()> {
897        // Flush manifest to disk
898        self.flush_manifest_if_dirty().await?;
899
900        // Flush any pending writes
901        let db_guard = self.db.read().await;
902        if let Some(ref db) = *db_guard {
903            db.flush()?;
904        }
905
906        if let Some(ref mut mmap_guard) = *self.vector_mmap.write().await {
907            mmap_guard.flush()?;
908        }
909
910        Ok(())
911    }
912    
913    async fn rollback_transaction(&mut self) -> Result<()> {
914        // In a full implementation, we'd rollback pending changes
915        // For now, just return success
916        Ok(())
917    }
918    
919    async fn delete_index(&mut self) -> Result<()> {
920        // Close database and memory map first
921        *self.db.write().await = None;
922        *self.vector_file.write().await = None;
923        *self.vector_mmap.write().await = None;
924        *self.manifest.write().await = None;
925        
926        // Remove all files in the index directory
927        if self.path.exists() {
928            fs::remove_dir_all(&self.path).await?;
929        }
930        Ok(())
931    }
932    
933    async fn get_stats(&self) -> Result<IndexStats> {
934        if let Some(manifest) = self.load_manifest().await? {
935            let size = if self.path.exists() {
936                // Calculate directory size
937                let mut total_size = 0u64;
938                let mut entries = fs::read_dir(&self.path).await?;
939                while let Some(entry) = entries.next_entry().await? {
940                    if let Ok(metadata) = entry.metadata().await {
941                        total_size += metadata.len();
942                    }
943                }
944                total_size
945            } else {
946                0
947            };
948            
949            Ok(IndexStats {
950                items: manifest.total_items,
951                size,
952                dimensions: manifest.dimensions,
953                distance_metric: manifest.distance_metric,
954            })
955        } else {
956            Ok(IndexStats {
957                items: 0,
958                size: 0,
959                dimensions: None,
960                distance_metric: DistanceMetric::Cosine,
961            })
962        }
963    }
964}
965
966#[cfg(test)]
967mod tests {
968    use super::*;
969    use vectrust_core::StorageBackend;
970    use tempfile::TempDir;
971    
972    #[tokio::test]
973    async fn test_optimized_storage_creation() {
974        let temp_dir = TempDir::new().unwrap();
975        let mut storage = OptimizedStorage::new(temp_dir.path()).unwrap();
976        
977        assert!(!storage.exists().await);
978        
979        let config = CreateIndexConfig::default();
980        storage.create_index(&config).await.unwrap();
981        
982        assert!(storage.exists().await);
983    }
984    
985    #[tokio::test]
986    async fn test_optimized_storage_insert_and_get() {
987        let temp_dir = TempDir::new().unwrap();
988        let mut storage = OptimizedStorage::new(temp_dir.path()).unwrap();
989        
990        let config = CreateIndexConfig::default();
991        storage.create_index(&config).await.unwrap();
992        
993        let item = VectorItem {
994            id: Uuid::new_v4(),
995            vector: vec![1.0, 0.0, 0.0],
996            metadata: serde_json::json!({"test": "data"}),
997            ..Default::default()
998        };
999        
1000        storage.insert_item(&item).await.unwrap();
1001        
1002        let retrieved = storage.get_item(&item.id).await.unwrap();
1003        assert!(retrieved.is_some());
1004        
1005        let retrieved_item = retrieved.unwrap();
1006        assert_eq!(retrieved_item.id, item.id);
1007        assert_eq!(retrieved_item.vector, item.vector);
1008    }
1009    
1010    #[tokio::test]
1011    async fn test_optimized_storage_query() {
1012        let temp_dir = TempDir::new().unwrap();
1013        let mut storage = OptimizedStorage::new(temp_dir.path()).unwrap();
1014        
1015        let config = CreateIndexConfig::default();
1016        storage.create_index(&config).await.unwrap();
1017        
1018        let item1 = VectorItem {
1019            id: Uuid::new_v4(),
1020            vector: vec![1.0, 0.0, 0.0],
1021            metadata: serde_json::json!({"name": "item1"}),
1022            ..Default::default()
1023        };
1024        
1025        let item2 = VectorItem {
1026            id: Uuid::new_v4(),
1027            vector: vec![0.0, 1.0, 0.0],
1028            metadata: serde_json::json!({"name": "item2"}),
1029            ..Default::default()
1030        };
1031        
1032        storage.insert_item(&item1).await.unwrap();
1033        storage.insert_item(&item2).await.unwrap();
1034        
1035        let query = Query {
1036            vector: Some(vec![1.0, 0.1, 0.0]),
1037            text: None,
1038            top_k: 2,
1039            filter: None,
1040        };
1041        
1042        let results = storage.query_items(&query).await.unwrap();
1043        assert_eq!(results.len(), 2);
1044        
1045        // First result should be more similar to item1
1046        assert_eq!(results[0].item.id, item1.id);
1047        assert!(results[0].score > results[1].score);
1048    }
1049}