1use async_trait::async_trait;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use tokio::fs;
5use tokio::sync::RwLock;
6use vectrust_core::*;
7use uuid::Uuid;
8use serde::{Serialize, Deserialize};
9use rocksdb::{DB, Options};
10use memmap2::{MmapMut, MmapOptions};
11use std::fs::OpenOptions;
12use std::io::{Seek, SeekFrom, Write};
13use bincode;
14
15pub struct OptimizedStorage {
17 path: PathBuf,
18 db: Arc<RwLock<Option<DB>>>,
19 vector_file: Arc<RwLock<Option<std::fs::File>>>,
20 vector_mmap: Arc<RwLock<Option<MmapMut>>>,
21 manifest: Arc<RwLock<Option<Manifest>>>,
22 dimensions: Arc<RwLock<Option<usize>>>,
23 manifest_dirty: Arc<RwLock<bool>>,
25 operations_since_save: Arc<RwLock<u32>>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct Manifest {
30 pub version: u32,
31 pub format: String,
32 pub created_at: chrono::DateTime<chrono::Utc>,
33 pub dimensions: Option<usize>,
34 pub distance_metric: DistanceMetric,
35 pub total_items: usize,
36 pub vector_file_size: u64,
37 pub next_vector_offset: u64,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41struct VectorRecord {
42 pub id: Uuid,
43 pub offset: u64,
44 pub dimensions: usize,
45 pub deleted: bool,
46}
47
48const METADATA_CF: &str = "metadata";
49const VECTOR_INDEX_CF: &str = "vector_index";
50const VECTOR_HEADER_SIZE: usize = 8; const MANIFEST_SAVE_INTERVAL: u32 = 100; impl OptimizedStorage {
55 pub fn new(path: &Path) -> Result<Self> {
56 Ok(Self {
57 path: path.to_path_buf(),
58 db: Arc::new(RwLock::new(None)),
59 vector_file: Arc::new(RwLock::new(None)),
60 vector_mmap: Arc::new(RwLock::new(None)),
61 manifest: Arc::new(RwLock::new(None)),
62 dimensions: Arc::new(RwLock::new(None)),
63 manifest_dirty: Arc::new(RwLock::new(false)),
64 operations_since_save: Arc::new(RwLock::new(0)),
65 })
66 }
67
68 async fn initialize_storage(&self) -> Result<()> {
69 if !self.path.exists() {
71 std::fs::create_dir_all(&self.path)?;
72 }
73
74 let db_path = self.path.join("metadata");
76 let mut db_opts = Options::default();
77 db_opts.create_if_missing(true);
78 db_opts.create_missing_column_families(true);
79
80 db_opts.set_max_write_buffer_number(4);
82 db_opts.set_write_buffer_size(64 * 1024 * 1024); db_opts.set_target_file_size_base(64 * 1024 * 1024); db_opts.set_level_compaction_dynamic_level_bytes(true);
85 db_opts.set_max_bytes_for_level_base(256 * 1024 * 1024); db_opts.set_max_background_jobs(4);
87 db_opts.set_bytes_per_sync(64 * 1024 * 1024); let cf_names = vec![METADATA_CF, VECTOR_INDEX_CF];
92 let db = DB::open_cf(&db_opts, db_path, cf_names)?;
93
94 *self.db.write().await = Some(db);
95
96 if let Some(manifest) = self.load_manifest().await? {
98 *self.manifest.write().await = Some(manifest.clone());
99 *self.dimensions.write().await = manifest.dimensions;
100
101 let vector_path = self.path.join("vectors.dat");
103 if vector_path.exists() {
104 let file = OpenOptions::new()
105 .read(true)
106 .write(true)
107 .open(&vector_path)?;
108
109 let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
110
111 *self.vector_file.write().await = Some(file);
112 *self.vector_mmap.write().await = Some(mmap);
113 }
114 }
115
116 Ok(())
117 }
118
119 async fn create_vector_file(&self, initial_size: u64) -> Result<()> {
120 let vector_path = self.path.join("vectors.dat");
121
122 let mut file = OpenOptions::new()
123 .read(true)
124 .write(true)
125 .create(true)
126 .truncate(true)
127 .open(&vector_path)?;
128
129 file.seek(SeekFrom::Start(initial_size - 1))?;
131 file.write_all(&[0])?;
132 file.flush()?;
133
134 let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
135
136 *self.vector_file.write().await = Some(file);
137 *self.vector_mmap.write().await = Some(mmap);
138
139 Ok(())
140 }
141
142 async fn write_vector_to_file(&self, vector: &[f32], offset: u64) -> Result<()> {
143 let mut mmap_guard = self.vector_mmap.write().await;
144 if let Some(ref mut mmap) = *mmap_guard {
145 let start = offset as usize;
146 let dimensions = vector.len();
147
148 let end_pos = start + VECTOR_HEADER_SIZE + (dimensions * 4);
150 let mmap_len = mmap.len();
151 if end_pos > mmap_len {
152 return Err(VectraError::StorageError {
153 message: format!("Memory map too small: need {} bytes, have {} bytes",
154 end_pos, mmap_len)
155 });
156 }
157
158 let dim_bytes = (dimensions as u64).to_le_bytes();
160 mmap[start..start + 8].copy_from_slice(&dim_bytes);
161
162 let vector_start = start + VECTOR_HEADER_SIZE;
164 for (i, &value) in vector.iter().enumerate() {
165 let value_bytes = value.to_le_bytes();
166 let pos = vector_start + (i * 4);
167 mmap[pos..pos + 4].copy_from_slice(&value_bytes);
168 }
169
170 }
173
174 Ok(())
175 }
176
177 async fn read_vector_from_file(&self, offset: u64, expected_dims: usize) -> Result<Vec<f32>> {
178 let mmap_guard = self.vector_mmap.read().await;
179 if let Some(ref mmap) = *mmap_guard {
180 let start = offset as usize;
181
182 let mut dim_bytes = [0u8; 8];
184 dim_bytes.copy_from_slice(&mmap[start..start + 8]);
185 let dimensions = u64::from_le_bytes(dim_bytes) as usize;
186
187 if dimensions != expected_dims {
188 return Err(VectraError::VectorValidation {
189 message: format!("Dimension mismatch: expected {}, got {}", expected_dims, dimensions)
190 });
191 }
192
193 let mut vector = Vec::with_capacity(dimensions);
195 let vector_start = start + VECTOR_HEADER_SIZE;
196
197 for i in 0..dimensions {
198 let pos = vector_start + (i * 4);
199 let mut value_bytes = [0u8; 4];
200 value_bytes.copy_from_slice(&mmap[pos..pos + 4]);
201 vector.push(f32::from_le_bytes(value_bytes));
202 }
203
204 Ok(vector)
205 } else {
206 Err(VectraError::StorageError {
207 message: "Vector file not initialized".to_string()
208 })
209 }
210 }
211
212 fn manifest_path(&self) -> PathBuf {
213 self.path.join("manifest.json")
214 }
215
216 async fn load_manifest(&self) -> Result<Option<Manifest>> {
217 let manifest_path = self.manifest_path();
218
219 if !manifest_path.exists() {
220 return Ok(None);
221 }
222
223 let content = fs::read_to_string(manifest_path).await?;
224 let manifest: Manifest = serde_json::from_str(&content)?;
225 Ok(Some(manifest))
226 }
227
228 async fn save_manifest_to_disk(&self, manifest: &Manifest) -> Result<()> {
229 let manifest_path = self.manifest_path();
230
231 static SAVE_COUNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
233 let count = SAVE_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
234 if count % 10 == 0 {
235 println!("DEBUG: Manifest save #{}, total_items: {}", count + 1, manifest.total_items);
236 }
237
238 if let Some(parent) = manifest_path.parent() {
240 fs::create_dir_all(parent).await?;
241 }
242
243 let start = std::time::Instant::now();
244 let content = serde_json::to_string_pretty(manifest)?;
245 let json_time = start.elapsed();
246
247 let start = std::time::Instant::now();
248 fs::write(manifest_path, content).await?;
249 let write_time = start.elapsed();
250
251 if count % 10 == 0 {
252 println!(" JSON serialize: {} µs, Disk write: {} µs",
253 json_time.as_micros(), write_time.as_micros());
254 }
255
256 Ok(())
257 }
258
259 async fn save_manifest(&self, manifest: &Manifest) -> Result<()> {
260 self.save_manifest_to_disk(manifest).await?;
261
262 *self.manifest.write().await = Some(manifest.clone());
264
265 Ok(())
266 }
267
268 async fn mark_manifest_dirty(&self) -> Result<()> {
270 *self.manifest_dirty.write().await = true;
271
272 let mut ops_count = self.operations_since_save.write().await;
273 *ops_count += 1;
274
275 if *ops_count >= MANIFEST_SAVE_INTERVAL {
277 drop(ops_count); self.flush_manifest_if_dirty().await?;
279 }
280
281 Ok(())
282 }
283
284 async fn flush_manifest_if_dirty(&self) -> Result<()> {
286 let is_dirty = {
287 let dirty = self.manifest_dirty.read().await;
288 *dirty
289 };
290
291 if is_dirty {
292 let manifest = {
293 let manifest_guard = self.manifest.read().await;
294 manifest_guard.clone()
295 };
296
297 if let Some(manifest) = manifest {
298 self.save_manifest_to_disk(&manifest).await?;
299 *self.manifest_dirty.write().await = false;
300 *self.operations_since_save.write().await = 0;
301 }
302 }
303
304 Ok(())
305 }
306
307 async fn ensure_vector_file_capacity(&self, needed_size: u64) -> Result<()> {
308 let vector_path = self.path.join("vectors.dat");
309
310 if !vector_path.exists() {
312 return Err(VectraError::StorageError {
313 message: "Vector file does not exist".to_string()
314 });
315 }
316
317 let metadata = tokio::fs::metadata(&vector_path).await?;
319 let current_size = metadata.len();
320
321 if needed_size > current_size {
322 let new_size = std::cmp::max(
324 (current_size as f64 * 1.5) as u64,
325 needed_size + (10 * 1024 * 1024) );
327
328 println!(" 📈 Growing vector file from {} MB to {} MB",
329 current_size / 1024 / 1024,
330 new_size / 1024 / 1024);
331
332 {
335 let mut mmap_guard = self.vector_mmap.write().await;
336 if let Some(ref mut mmap) = *mmap_guard {
337 mmap.flush()?;
338 }
339 *mmap_guard = None; }
341
342 {
344 let mut file_guard = self.vector_file.write().await;
345 *file_guard = None;
346 }
347
348 let file = OpenOptions::new()
350 .read(true)
351 .write(true)
352 .open(&vector_path)?;
353
354 file.set_len(new_size)?;
355 file.sync_all()?; let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
359
360 *self.vector_file.write().await = Some(file);
362 *self.vector_mmap.write().await = Some(mmap);
363
364 println!(" ✅ Vector file resized successfully");
365 }
366
367 Ok(())
368 }
369
370 async fn get_next_vector_offset(&self, vector_size: usize) -> Result<u64> {
371 let current_offset = {
372 let mut manifest_guard = self.manifest.write().await;
373 if let Some(ref mut manifest) = *manifest_guard {
374 let current_offset = manifest.next_vector_offset;
375 let record_size = VECTOR_HEADER_SIZE + (vector_size * 4); manifest.next_vector_offset += record_size as u64;
377 manifest.vector_file_size = manifest.next_vector_offset;
378 current_offset
379 } else {
380 return Err(VectraError::StorageError {
381 message: "Manifest not initialized".to_string()
382 });
383 }
384 };
385
386 Ok(current_offset)
388 }
389
390 async fn get_next_vector_offset_and_mark_dirty(&self, vector_size: usize) -> Result<u64> {
391 let offset = self.get_next_vector_offset(vector_size).await?;
392 self.mark_manifest_dirty().await?;
393 Ok(offset)
394 }
395
396 pub async fn flush(&self) -> Result<()> {
398 self.flush_manifest_if_dirty().await?;
400
401 if let Some(ref mmap) = *self.vector_mmap.read().await {
403 mmap.flush()?;
404 }
405
406 if let Some(ref db) = *self.db.read().await {
408 db.flush()?;
409 }
410
411 Ok(())
412 }
413}
414
415#[async_trait]
416impl StorageBackend for OptimizedStorage {
417 async fn exists(&self) -> bool {
418 self.manifest_path().exists()
419 }
420
421 async fn create_index(&mut self, config: &CreateIndexConfig) -> Result<()> {
422 let manifest_path = self.manifest_path();
423
424 if manifest_path.exists() && !config.delete_if_exists {
425 return Err(VectraError::IndexAlreadyExists {
426 path: manifest_path.to_string_lossy().to_string()
427 });
428 }
429
430 if config.delete_if_exists && self.path.exists() {
432 fs::remove_dir_all(&self.path).await.ok();
433 }
434
435 let manifest = Manifest {
436 version: 2,
437 format: "optimized".to_string(),
438 created_at: chrono::Utc::now(),
439 dimensions: None,
440 distance_metric: config.distance_metric.clone(),
441 total_items: 0,
442 vector_file_size: 0,
443 next_vector_offset: 0,
444 };
445
446 self.save_manifest(&manifest).await?;
447
448 self.initialize_storage().await?;
450
451 self.flush_manifest_if_dirty().await?;
453
454 self.create_vector_file(10 * 1024 * 1024).await?;
457
458 Ok(())
459 }
460
461 async fn get_item(&self, id: &Uuid) -> Result<Option<VectorItem>> {
462 if self.db.read().await.is_none() {
464 self.initialize_storage().await?;
465 }
466
467 let db_guard = self.db.read().await;
468 if let Some(ref db) = *db_guard {
469 let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
470 let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
471
472 let id_bytes = id.as_bytes();
473
474 if let Some(metadata_bytes) = db.get_cf(metadata_cf, id_bytes)? {
476 let mut item: VectorItem = serde_json::from_slice(&metadata_bytes)?;
477
478 if let Some(vector_record_bytes) = db.get_cf(vector_index_cf, id_bytes)? {
480 let vector_record: VectorRecord = bincode::deserialize(&vector_record_bytes)?;
481
482 if !vector_record.deleted {
483 item.vector = self.read_vector_from_file(vector_record.offset, vector_record.dimensions).await?;
485 return Ok(Some(item));
486 }
487 }
488 }
489 }
490
491 Ok(None)
492 }
493
494 async fn insert_item(&mut self, item: &VectorItem) -> Result<()> {
495 if self.db.read().await.is_none() {
497 self.initialize_storage().await?;
498 }
499
500 let start_total = std::time::Instant::now();
502
503 static DEBUG_COUNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
505 let debug_count = DEBUG_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
506 if debug_count % 50 == 0 {
507 println!("DEBUG: insert_item called {} times", debug_count + 1);
508 }
509
510 let dimensions = item.vector.len();
511
512 let mut needs_manifest_update = false;
514 {
515 let mut dims_guard = self.dimensions.write().await;
516 if dims_guard.is_none() {
517 *dims_guard = Some(dimensions);
518 needs_manifest_update = true;
519 } else if let Some(existing_dims) = *dims_guard {
520 if existing_dims != dimensions {
521 return Err(VectraError::VectorValidation {
522 message: format!("Vector dimension mismatch: expected {}, got {}", existing_dims, dimensions)
523 });
524 }
525 }
526 }
527
528 if needs_manifest_update {
530 let mut manifest_guard = self.manifest.write().await;
531 if let Some(ref mut manifest) = *manifest_guard {
532 manifest.dimensions = Some(dimensions);
533 self.save_manifest_to_disk(manifest).await?;
535 }
536 }
537
538 let start = std::time::Instant::now();
540 let vector_offset = self.get_next_vector_offset_and_mark_dirty(dimensions).await?;
541 let offset_time = start.elapsed();
542
543 let start = std::time::Instant::now();
545 self.write_vector_to_file(&item.vector, vector_offset).await?;
546 let mmap_time = start.elapsed();
547
548 let db_guard = self.db.read().await;
550 if let Some(ref db) = *db_guard {
551 let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
552 let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
553
554 let id_bytes = item.id.as_bytes();
555
556 let mut metadata_item = item.clone();
558 metadata_item.vector = Vec::new(); let metadata_bytes = serde_json::to_vec(&metadata_item)?;
560 let mut write_opts = rocksdb::WriteOptions::default();
562 write_opts.disable_wal(true); let start = std::time::Instant::now();
565 db.put_cf_opt(metadata_cf, id_bytes, metadata_bytes, &write_opts)?;
566
567 let vector_record = VectorRecord {
569 id: item.id,
570 offset: vector_offset,
571 dimensions,
572 deleted: false,
573 };
574 let vector_record_bytes = bincode::serialize(&vector_record)?;
575 db.put_cf_opt(vector_index_cf, id_bytes, vector_record_bytes, &write_opts)?;
576 let db_time = start.elapsed();
577
578 let total_items = {
580 let mut manifest_guard = self.manifest.write().await;
581 if let Some(ref mut manifest) = *manifest_guard {
582 manifest.total_items += 1;
583 manifest.total_items
584 } else {
585 return Err(VectraError::StorageError {
586 message: "Manifest not initialized".to_string()
587 });
588 }
589 };
590
591 self.mark_manifest_dirty().await?;
593
594 if total_items == 50 || total_items == 100 || total_items == 200 || total_items == 500 || total_items == 1000 {
596 let total_time = start_total.elapsed();
597 println!(" Storage: After {} items, insert took {} µs (offset: {} µs, mmap: {} µs, db: {} µs, rest: {} µs)",
598 total_items,
599 total_time.as_micros(),
600 offset_time.as_micros(),
601 mmap_time.as_micros(),
602 db_time.as_micros(),
603 total_time.as_micros() - offset_time.as_micros() - mmap_time.as_micros() - db_time.as_micros());
604 }
605 }
606
607 Ok(())
608 }
609
610 async fn insert_items(&mut self, items: &[VectorItem]) -> Result<()> {
611 if items.is_empty() {
612 return Ok(());
613 }
614
615 if self.db.read().await.is_none() {
617 self.initialize_storage().await?;
618 }
619
620 let first_dimensions = items[0].vector.len();
622 for item in items {
623 if item.vector.len() != first_dimensions {
624 return Err(VectraError::VectorValidation {
625 message: format!("All vectors must have same dimensions. Expected {}, got {}", first_dimensions, item.vector.len())
626 });
627 }
628 }
629
630 let mut needs_manifest_update = false;
632 {
633 let mut dims_guard = self.dimensions.write().await;
634 if dims_guard.is_none() {
635 *dims_guard = Some(first_dimensions);
636 needs_manifest_update = true;
637 } else if let Some(existing_dims) = *dims_guard {
638 if existing_dims != first_dimensions {
639 return Err(VectraError::VectorValidation {
640 message: format!("Vector dimension mismatch: expected {}, got {}", existing_dims, first_dimensions)
641 });
642 }
643 }
644 }
645
646 if needs_manifest_update {
648 let mut manifest_guard = self.manifest.write().await;
649 if let Some(ref mut manifest) = *manifest_guard {
650 manifest.dimensions = Some(first_dimensions);
651 self.save_manifest_to_disk(manifest).await?;
652 }
653 }
654
655 let record_size = VECTOR_HEADER_SIZE + (first_dimensions * 4);
657 let total_space_needed = items.len() * record_size;
658
659 {
661 let manifest = self.manifest.read().await;
662 if let Some(ref m) = *manifest {
663 let current_offset = m.next_vector_offset;
664 let final_size = current_offset + total_space_needed as u64;
665 drop(manifest); if items.len() >= 2500 {
669 println!("DEBUG: Batch size: {}, Current offset: {}, Final size needed: {}",
670 items.len(), current_offset, final_size);
671 }
672
673 self.ensure_vector_file_capacity(final_size).await?;
674 }
675 }
676
677 let record_size = VECTOR_HEADER_SIZE + (first_dimensions * 4);
679 let mut offsets = Vec::with_capacity(items.len());
680 {
681 let mut manifest_guard = self.manifest.write().await;
682 if let Some(ref mut manifest) = *manifest_guard {
683 let mut current_offset = manifest.next_vector_offset;
684 for _ in 0..items.len() {
685 offsets.push(current_offset);
686 current_offset += record_size as u64;
687 }
688 manifest.next_vector_offset = current_offset;
689 manifest.vector_file_size = current_offset;
690 } else {
691 return Err(VectraError::StorageError {
692 message: "Manifest not initialized".to_string()
693 });
694 }
695 }
696
697 let mut prepared_data = Vec::with_capacity(items.len());
699 for (idx, item) in items.iter().enumerate() {
700 let vector_offset = offsets[idx];
701 self.write_vector_to_file(&item.vector, vector_offset).await?;
702
703 let mut metadata_item = item.clone();
705 metadata_item.vector = Vec::new();
706 let metadata_bytes = serde_json::to_vec(&metadata_item)?;
707
708 let vector_record = VectorRecord {
710 id: item.id,
711 offset: vector_offset,
712 dimensions: first_dimensions,
713 deleted: false,
714 };
715 let vector_record_bytes = bincode::serialize(&vector_record)?;
716
717 prepared_data.push((item.id.as_bytes().to_vec(), metadata_bytes, vector_record_bytes));
718 }
719
720 let total_items_added = prepared_data.len();
722 {
723 let db_guard = self.db.read().await;
724 if let Some(ref db) = *db_guard {
725 let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
726 let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
727
728 let mut batch = rocksdb::WriteBatch::default();
730
731 for (id_bytes, metadata_bytes, vector_record_bytes) in prepared_data {
732 batch.put_cf(metadata_cf, &id_bytes, metadata_bytes);
733 batch.put_cf(vector_index_cf, &id_bytes, vector_record_bytes);
734 }
735
736 db.write(batch)?;
738 }
739 }
740
741 {
743 let mut manifest_guard = self.manifest.write().await;
744 if let Some(ref mut manifest) = *manifest_guard {
745 manifest.total_items += total_items_added;
746 }
747 }
748
749 self.mark_manifest_dirty().await?;
751
752 Ok(())
753 }
754
755 async fn update_item(&mut self, item: &VectorItem) -> Result<()> {
756 self.delete_item(&item.id).await?;
758 self.insert_item(item).await?;
759 Ok(())
760 }
761
762 async fn delete_item(&mut self, id: &Uuid) -> Result<()> {
763 let db_guard = self.db.read().await;
764 if let Some(ref db) = *db_guard {
765 let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
766 let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
767
768 let id_bytes = id.as_bytes();
769
770 if let Some(vector_record_bytes) = db.get_cf(vector_index_cf, id_bytes)? {
772 let mut vector_record: VectorRecord = bincode::deserialize(&vector_record_bytes)?;
773 vector_record.deleted = true;
774 let updated_bytes = bincode::serialize(&vector_record)?;
775 db.put_cf(vector_index_cf, id_bytes, updated_bytes)?;
776 }
777
778 db.delete_cf(metadata_cf, id_bytes)?;
780
781 let should_mark_dirty = {
783 let mut manifest_guard = self.manifest.write().await;
784 if let Some(ref mut manifest) = *manifest_guard {
785 if manifest.total_items > 0 {
786 manifest.total_items -= 1;
787 true
788 } else {
789 false
790 }
791 } else {
792 false
793 }
794 };
795
796 if should_mark_dirty {
798 self.mark_manifest_dirty().await?;
799 }
800 }
801
802 Ok(())
803 }
804
805 async fn list_items(&self, options: Option<ListOptions>) -> Result<Vec<VectorItem>> {
806 if self.db.read().await.is_none() {
808 self.initialize_storage().await?;
809 }
810
811 let metadata_records = {
813 let db_guard = self.db.read().await;
814 if let Some(ref db) = *db_guard {
815 let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
816 let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
817
818 let mut records = Vec::new();
819 let iter = db.iterator_cf(metadata_cf, rocksdb::IteratorMode::Start);
820
821 for item in iter {
822 let (key, value) = item?;
823
824 if let Some(vector_record_bytes) = db.get_cf(vector_index_cf, &key)? {
826 let vector_record: VectorRecord = bincode::deserialize(&vector_record_bytes)?;
827
828 if !vector_record.deleted {
829 let metadata_item: VectorItem = serde_json::from_slice(&value)?;
830 records.push((metadata_item, vector_record));
831
832 if let Some(ref opts) = options {
834 if let Some(limit) = opts.limit {
835 if records.len() >= limit {
836 break;
837 }
838 }
839 }
840 }
841 }
842 }
843
844 records
845 } else {
846 Vec::new()
847 }
848 };
849
850 let mut items = Vec::new();
852 for (mut metadata_item, vector_record) in metadata_records {
853 metadata_item.vector = self.read_vector_from_file(vector_record.offset, vector_record.dimensions).await?;
855 items.push(metadata_item);
856 }
857
858 Ok(items)
859 }
860
861 async fn query_items(&self, query: &Query) -> Result<Vec<QueryResult>> {
862 if let Some(ref query_vector) = query.vector {
863 let all_items = self.list_items(None).await?;
866 let mut results = Vec::new();
867
868 for item in all_items {
869 if item.vector.len() == query_vector.len() {
870 let similarity = VectorOps::cosine_similarity(query_vector, &item.vector);
871 results.push(QueryResult {
872 item,
873 score: similarity,
874 });
875 }
876 }
877
878 results.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
880
881 results.truncate(query.top_k);
883
884 Ok(results)
885 } else {
886 Ok(Vec::new())
887 }
888 }
889
890 async fn begin_transaction(&mut self) -> Result<()> {
891 Ok(())
894 }
895
896 async fn commit_transaction(&mut self) -> Result<()> {
897 self.flush_manifest_if_dirty().await?;
899
900 let db_guard = self.db.read().await;
902 if let Some(ref db) = *db_guard {
903 db.flush()?;
904 }
905
906 if let Some(ref mut mmap_guard) = *self.vector_mmap.write().await {
907 mmap_guard.flush()?;
908 }
909
910 Ok(())
911 }
912
913 async fn rollback_transaction(&mut self) -> Result<()> {
914 Ok(())
917 }
918
919 async fn delete_index(&mut self) -> Result<()> {
920 *self.db.write().await = None;
922 *self.vector_file.write().await = None;
923 *self.vector_mmap.write().await = None;
924 *self.manifest.write().await = None;
925
926 if self.path.exists() {
928 fs::remove_dir_all(&self.path).await?;
929 }
930 Ok(())
931 }
932
933 async fn get_stats(&self) -> Result<IndexStats> {
934 if let Some(manifest) = self.load_manifest().await? {
935 let size = if self.path.exists() {
936 let mut total_size = 0u64;
938 let mut entries = fs::read_dir(&self.path).await?;
939 while let Some(entry) = entries.next_entry().await? {
940 if let Ok(metadata) = entry.metadata().await {
941 total_size += metadata.len();
942 }
943 }
944 total_size
945 } else {
946 0
947 };
948
949 Ok(IndexStats {
950 items: manifest.total_items,
951 size,
952 dimensions: manifest.dimensions,
953 distance_metric: manifest.distance_metric,
954 })
955 } else {
956 Ok(IndexStats {
957 items: 0,
958 size: 0,
959 dimensions: None,
960 distance_metric: DistanceMetric::Cosine,
961 })
962 }
963 }
964}
965
966#[cfg(test)]
967mod tests {
968 use super::*;
969 use vectrust_core::StorageBackend;
970 use tempfile::TempDir;
971
972 #[tokio::test]
973 async fn test_optimized_storage_creation() {
974 let temp_dir = TempDir::new().unwrap();
975 let mut storage = OptimizedStorage::new(temp_dir.path()).unwrap();
976
977 assert!(!storage.exists().await);
978
979 let config = CreateIndexConfig::default();
980 storage.create_index(&config).await.unwrap();
981
982 assert!(storage.exists().await);
983 }
984
985 #[tokio::test]
986 async fn test_optimized_storage_insert_and_get() {
987 let temp_dir = TempDir::new().unwrap();
988 let mut storage = OptimizedStorage::new(temp_dir.path()).unwrap();
989
990 let config = CreateIndexConfig::default();
991 storage.create_index(&config).await.unwrap();
992
993 let item = VectorItem {
994 id: Uuid::new_v4(),
995 vector: vec![1.0, 0.0, 0.0],
996 metadata: serde_json::json!({"test": "data"}),
997 ..Default::default()
998 };
999
1000 storage.insert_item(&item).await.unwrap();
1001
1002 let retrieved = storage.get_item(&item.id).await.unwrap();
1003 assert!(retrieved.is_some());
1004
1005 let retrieved_item = retrieved.unwrap();
1006 assert_eq!(retrieved_item.id, item.id);
1007 assert_eq!(retrieved_item.vector, item.vector);
1008 }
1009
1010 #[tokio::test]
1011 async fn test_optimized_storage_query() {
1012 let temp_dir = TempDir::new().unwrap();
1013 let mut storage = OptimizedStorage::new(temp_dir.path()).unwrap();
1014
1015 let config = CreateIndexConfig::default();
1016 storage.create_index(&config).await.unwrap();
1017
1018 let item1 = VectorItem {
1019 id: Uuid::new_v4(),
1020 vector: vec![1.0, 0.0, 0.0],
1021 metadata: serde_json::json!({"name": "item1"}),
1022 ..Default::default()
1023 };
1024
1025 let item2 = VectorItem {
1026 id: Uuid::new_v4(),
1027 vector: vec![0.0, 1.0, 0.0],
1028 metadata: serde_json::json!({"name": "item2"}),
1029 ..Default::default()
1030 };
1031
1032 storage.insert_item(&item1).await.unwrap();
1033 storage.insert_item(&item2).await.unwrap();
1034
1035 let query = Query {
1036 vector: Some(vec![1.0, 0.1, 0.0]),
1037 text: None,
1038 top_k: 2,
1039 filter: None,
1040 };
1041
1042 let results = storage.query_items(&query).await.unwrap();
1043 assert_eq!(results.len(), 2);
1044
1045 assert_eq!(results[0].item.id, item1.id);
1047 assert!(results[0].score > results[1].score);
1048 }
1049}