1use async_trait::async_trait;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use tokio::fs;
5use tokio::sync::RwLock;
6use vectrust_core::*;
7use uuid::Uuid;
8use serde::{Serialize, Deserialize};
9use rocksdb::{DB, Options};
10use memmap2::{MmapMut, MmapOptions};
11use std::fs::OpenOptions;
12use std::io::{Seek, SeekFrom, Write};
13use bincode;
14
15pub struct OptimizedStorage {
17 path: PathBuf,
18 db: Arc<RwLock<Option<DB>>>,
19 vector_file: Arc<RwLock<Option<std::fs::File>>>,
20 vector_mmap: Arc<RwLock<Option<MmapMut>>>,
21 manifest: Arc<RwLock<Option<Manifest>>>,
22 dimensions: Arc<RwLock<Option<usize>>>,
23 manifest_dirty: Arc<RwLock<bool>>,
25 operations_since_save: Arc<RwLock<u32>>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct Manifest {
30 pub version: u32,
31 pub format: String,
32 pub created_at: chrono::DateTime<chrono::Utc>,
33 pub dimensions: Option<usize>,
34 pub distance_metric: DistanceMetric,
35 pub total_items: usize,
36 pub vector_file_size: u64,
37 pub next_vector_offset: u64,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41struct VectorRecord {
42 pub id: Uuid,
43 pub offset: u64,
44 pub dimensions: usize,
45 pub deleted: bool,
46}
47
48const METADATA_CF: &str = "metadata";
49const VECTOR_INDEX_CF: &str = "vector_index";
50const VECTOR_HEADER_SIZE: usize = 8; const MANIFEST_SAVE_INTERVAL: u32 = 100; impl OptimizedStorage {
55 pub fn new(path: &Path) -> Result<Self> {
56 Ok(Self {
57 path: path.to_path_buf(),
58 db: Arc::new(RwLock::new(None)),
59 vector_file: Arc::new(RwLock::new(None)),
60 vector_mmap: Arc::new(RwLock::new(None)),
61 manifest: Arc::new(RwLock::new(None)),
62 dimensions: Arc::new(RwLock::new(None)),
63 manifest_dirty: Arc::new(RwLock::new(false)),
64 operations_since_save: Arc::new(RwLock::new(0)),
65 })
66 }
67
68 async fn initialize_storage(&self) -> Result<()> {
69 if !self.path.exists() {
71 std::fs::create_dir_all(&self.path)?;
72 }
73
74 let db_path = self.path.join("metadata");
76 let mut db_opts = Options::default();
77 db_opts.create_if_missing(true);
78 db_opts.create_missing_column_families(true);
79
80 db_opts.set_max_write_buffer_number(4);
82 db_opts.set_write_buffer_size(64 * 1024 * 1024); db_opts.set_target_file_size_base(64 * 1024 * 1024); db_opts.set_level_compaction_dynamic_level_bytes(true);
85 db_opts.set_max_bytes_for_level_base(256 * 1024 * 1024); db_opts.set_max_background_jobs(4);
87 db_opts.set_bytes_per_sync(64 * 1024 * 1024); let cf_names = vec![METADATA_CF, VECTOR_INDEX_CF];
92 let db = DB::open_cf(&db_opts, db_path, cf_names)?;
93
94 *self.db.write().await = Some(db);
95
96 if let Some(manifest) = self.load_manifest().await? {
98 *self.manifest.write().await = Some(manifest.clone());
99 *self.dimensions.write().await = manifest.dimensions;
100
101 let vector_path = self.path.join("vectors.dat");
103 if vector_path.exists() {
104 let file = OpenOptions::new()
105 .read(true)
106 .write(true)
107 .open(&vector_path)?;
108
109 let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
110
111 *self.vector_file.write().await = Some(file);
112 *self.vector_mmap.write().await = Some(mmap);
113 }
114 }
115
116 Ok(())
117 }
118
119 async fn create_vector_file(&self, initial_size: u64) -> Result<()> {
120 let vector_path = self.path.join("vectors.dat");
121
122 let mut file = OpenOptions::new()
123 .read(true)
124 .write(true)
125 .create(true)
126 .truncate(true)
127 .open(&vector_path)?;
128
129 file.seek(SeekFrom::Start(initial_size - 1))?;
131 file.write_all(&[0])?;
132 file.flush()?;
133
134 let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
135
136 *self.vector_file.write().await = Some(file);
137 *self.vector_mmap.write().await = Some(mmap);
138
139 Ok(())
140 }
141
142 async fn write_vector_to_file(&self, vector: &[f32], offset: u64) -> Result<()> {
143 let mut mmap_guard = self.vector_mmap.write().await;
144 if let Some(ref mut mmap) = *mmap_guard {
145 let start = offset as usize;
146 let dimensions = vector.len();
147
148 let end_pos = start + VECTOR_HEADER_SIZE + (dimensions * 4);
150 let mmap_len = mmap.len();
151 if end_pos > mmap_len {
152 return Err(VectraError::StorageError {
153 message: format!("Memory map too small: need {} bytes, have {} bytes",
154 end_pos, mmap_len)
155 });
156 }
157
158 let dim_bytes = (dimensions as u64).to_le_bytes();
160 mmap[start..start + 8].copy_from_slice(&dim_bytes);
161
162 let vector_start = start + VECTOR_HEADER_SIZE;
164 for (i, &value) in vector.iter().enumerate() {
165 let value_bytes = value.to_le_bytes();
166 let pos = vector_start + (i * 4);
167 mmap[pos..pos + 4].copy_from_slice(&value_bytes);
168 }
169
170 }
173
174 Ok(())
175 }
176
177 async fn read_vector_from_file(&self, offset: u64, expected_dims: usize) -> Result<Vec<f32>> {
178 let mmap_guard = self.vector_mmap.read().await;
179 if let Some(ref mmap) = *mmap_guard {
180 let start = offset as usize;
181
182 let mut dim_bytes = [0u8; 8];
184 dim_bytes.copy_from_slice(&mmap[start..start + 8]);
185 let dimensions = u64::from_le_bytes(dim_bytes) as usize;
186
187 if dimensions != expected_dims {
188 return Err(VectraError::VectorValidation {
189 message: format!("Dimension mismatch: expected {}, got {}", expected_dims, dimensions)
190 });
191 }
192
193 let mut vector = Vec::with_capacity(dimensions);
195 let vector_start = start + VECTOR_HEADER_SIZE;
196
197 for i in 0..dimensions {
198 let pos = vector_start + (i * 4);
199 let mut value_bytes = [0u8; 4];
200 value_bytes.copy_from_slice(&mmap[pos..pos + 4]);
201 vector.push(f32::from_le_bytes(value_bytes));
202 }
203
204 Ok(vector)
205 } else {
206 Err(VectraError::StorageError {
207 message: "Vector file not initialized".to_string()
208 })
209 }
210 }
211
212 fn manifest_path(&self) -> PathBuf {
213 self.path.join("manifest.json")
214 }
215
216 async fn load_manifest(&self) -> Result<Option<Manifest>> {
217 let manifest_path = self.manifest_path();
218
219 if !manifest_path.exists() {
220 return Ok(None);
221 }
222
223 let content = fs::read_to_string(manifest_path).await?;
224 let manifest: Manifest = serde_json::from_str(&content)?;
225 Ok(Some(manifest))
226 }
227
228 async fn save_manifest_to_disk(&self, manifest: &Manifest) -> Result<()> {
229 let manifest_path = self.manifest_path();
230
231 static SAVE_COUNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
233 let count = SAVE_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
234 if count % 10 == 0 {
235 println!("DEBUG: Manifest save #{}, total_items: {}", count + 1, manifest.total_items);
236 }
237
238 if let Some(parent) = manifest_path.parent() {
240 fs::create_dir_all(parent).await?;
241 }
242
243 let start = std::time::Instant::now();
244 let content = serde_json::to_string_pretty(manifest)?;
245 let json_time = start.elapsed();
246
247 let start = std::time::Instant::now();
248 fs::write(manifest_path, content).await?;
249 let write_time = start.elapsed();
250
251 if count % 10 == 0 {
252 println!(" JSON serialize: {} µs, Disk write: {} µs",
253 json_time.as_micros(), write_time.as_micros());
254 }
255
256 Ok(())
257 }
258
259 async fn save_manifest(&self, manifest: &Manifest) -> Result<()> {
260 self.save_manifest_to_disk(manifest).await?;
261
262 *self.manifest.write().await = Some(manifest.clone());
264
265 Ok(())
266 }
267
268 async fn mark_manifest_dirty(&self) -> Result<()> {
270 *self.manifest_dirty.write().await = true;
271
272 let mut ops_count = self.operations_since_save.write().await;
273 *ops_count += 1;
274
275 if *ops_count >= MANIFEST_SAVE_INTERVAL {
277 drop(ops_count); self.flush_manifest_if_dirty().await?;
279 }
280
281 Ok(())
282 }
283
284 async fn flush_manifest_if_dirty(&self) -> Result<()> {
286 let is_dirty = {
287 let dirty = self.manifest_dirty.read().await;
288 *dirty
289 };
290
291 if is_dirty {
292 let manifest = {
293 let manifest_guard = self.manifest.read().await;
294 manifest_guard.clone()
295 };
296
297 if let Some(manifest) = manifest {
298 self.save_manifest_to_disk(&manifest).await?;
299 *self.manifest_dirty.write().await = false;
300 *self.operations_since_save.write().await = 0;
301 }
302 }
303
304 Ok(())
305 }
306
307 async fn ensure_vector_file_capacity(&self, needed_size: u64) -> Result<()> {
308 let vector_path = self.path.join("vectors.dat");
309
310 if !vector_path.exists() {
312 return Err(VectraError::StorageError {
313 message: "Vector file does not exist".to_string()
314 });
315 }
316
317 let metadata = tokio::fs::metadata(&vector_path).await?;
319 let current_size = metadata.len();
320
321 if needed_size > current_size {
322 let new_size = std::cmp::max(
324 (current_size as f64 * 1.5) as u64,
325 needed_size + (10 * 1024 * 1024) );
327
328 println!(" 📈 Growing vector file from {} MB to {} MB",
329 current_size / 1024 / 1024,
330 new_size / 1024 / 1024);
331
332 {
335 let mut mmap_guard = self.vector_mmap.write().await;
336 if let Some(ref mut mmap) = *mmap_guard {
337 mmap.flush()?;
338 }
339 *mmap_guard = None; }
341
342 {
344 let mut file_guard = self.vector_file.write().await;
345 *file_guard = None;
346 }
347
348 let file = OpenOptions::new()
350 .read(true)
351 .write(true)
352 .open(&vector_path)?;
353
354 file.set_len(new_size)?;
355 file.sync_all()?; let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
359
360 *self.vector_file.write().await = Some(file);
362 *self.vector_mmap.write().await = Some(mmap);
363
364 println!(" ✅ Vector file resized successfully");
365 }
366
367 Ok(())
368 }
369
370 async fn get_next_vector_offset(&self, vector_size: usize) -> Result<u64> {
371 let current_offset = {
372 let mut manifest_guard = self.manifest.write().await;
373 if let Some(ref mut manifest) = *manifest_guard {
374 let current_offset = manifest.next_vector_offset;
375 let record_size = VECTOR_HEADER_SIZE + (vector_size * 4); manifest.next_vector_offset += record_size as u64;
377 manifest.vector_file_size = manifest.next_vector_offset;
378 current_offset
379 } else {
380 return Err(VectraError::StorageError {
381 message: "Manifest not initialized".to_string()
382 });
383 }
384 };
385
386 Ok(current_offset)
388 }
389
390 async fn get_next_vector_offset_and_mark_dirty(&self, vector_size: usize) -> Result<u64> {
391 let offset = self.get_next_vector_offset(vector_size).await?;
392 self.mark_manifest_dirty().await?;
393 Ok(offset)
394 }
395
396 pub async fn flush(&self) -> Result<()> {
398 self.flush_manifest_if_dirty().await?;
400
401 if let Some(ref mmap) = *self.vector_mmap.read().await {
403 mmap.flush()?;
404 }
405
406 if let Some(ref db) = *self.db.read().await {
408 db.flush()?;
409 }
410
411 Ok(())
412 }
413}
414
415#[async_trait]
416impl StorageBackend for OptimizedStorage {
417 async fn exists(&self) -> bool {
418 self.manifest_path().exists()
419 }
420
421 async fn create_index(&mut self, config: &CreateIndexConfig) -> Result<()> {
422 let manifest_path = self.manifest_path();
423
424 if manifest_path.exists() && !config.delete_if_exists {
425 return Err(VectraError::IndexAlreadyExists {
426 path: manifest_path.to_string_lossy().to_string()
427 });
428 }
429
430 if config.delete_if_exists && self.path.exists() {
432 fs::remove_dir_all(&self.path).await.ok();
433 }
434
435 let manifest = Manifest {
436 version: 2,
437 format: "optimized".to_string(),
438 created_at: chrono::Utc::now(),
439 dimensions: None,
440 distance_metric: config.distance_metric.clone(),
441 total_items: 0,
442 vector_file_size: 0,
443 next_vector_offset: 0,
444 };
445
446 self.save_manifest(&manifest).await?;
447
448 self.initialize_storage().await?;
450
451 self.flush_manifest_if_dirty().await?;
453
454 self.create_vector_file(10 * 1024 * 1024).await?;
457
458 Ok(())
459 }
460
461 async fn get_item(&self, id: &Uuid) -> Result<Option<VectorItem>> {
462 let db_guard = self.db.read().await;
463 if let Some(ref db) = *db_guard {
464 let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
465 let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
466
467 let id_bytes = id.as_bytes();
468
469 if let Some(metadata_bytes) = db.get_cf(metadata_cf, id_bytes)? {
471 let mut item: VectorItem = serde_json::from_slice(&metadata_bytes)?;
472
473 if let Some(vector_record_bytes) = db.get_cf(vector_index_cf, id_bytes)? {
475 let vector_record: VectorRecord = bincode::deserialize(&vector_record_bytes)?;
476
477 if !vector_record.deleted {
478 item.vector = self.read_vector_from_file(vector_record.offset, vector_record.dimensions).await?;
480 return Ok(Some(item));
481 }
482 }
483 }
484 }
485
486 Ok(None)
487 }
488
489 async fn insert_item(&mut self, item: &VectorItem) -> Result<()> {
490 if self.db.read().await.is_none() {
492 self.initialize_storage().await?;
493 }
494
495 let start_total = std::time::Instant::now();
497
498 static DEBUG_COUNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
500 let debug_count = DEBUG_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
501 if debug_count % 50 == 0 {
502 println!("DEBUG: insert_item called {} times", debug_count + 1);
503 }
504
505 let dimensions = item.vector.len();
506
507 let mut needs_manifest_update = false;
509 {
510 let mut dims_guard = self.dimensions.write().await;
511 if dims_guard.is_none() {
512 *dims_guard = Some(dimensions);
513 needs_manifest_update = true;
514 } else if let Some(existing_dims) = *dims_guard {
515 if existing_dims != dimensions {
516 return Err(VectraError::VectorValidation {
517 message: format!("Vector dimension mismatch: expected {}, got {}", existing_dims, dimensions)
518 });
519 }
520 }
521 }
522
523 if needs_manifest_update {
525 let mut manifest_guard = self.manifest.write().await;
526 if let Some(ref mut manifest) = *manifest_guard {
527 manifest.dimensions = Some(dimensions);
528 self.save_manifest_to_disk(manifest).await?;
530 }
531 }
532
533 let start = std::time::Instant::now();
535 let vector_offset = self.get_next_vector_offset_and_mark_dirty(dimensions).await?;
536 let offset_time = start.elapsed();
537
538 let start = std::time::Instant::now();
540 self.write_vector_to_file(&item.vector, vector_offset).await?;
541 let mmap_time = start.elapsed();
542
543 let db_guard = self.db.read().await;
545 if let Some(ref db) = *db_guard {
546 let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
547 let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
548
549 let id_bytes = item.id.as_bytes();
550
551 let mut metadata_item = item.clone();
553 metadata_item.vector = Vec::new(); let metadata_bytes = serde_json::to_vec(&metadata_item)?;
555 let mut write_opts = rocksdb::WriteOptions::default();
557 write_opts.disable_wal(true); let start = std::time::Instant::now();
560 db.put_cf_opt(metadata_cf, id_bytes, metadata_bytes, &write_opts)?;
561
562 let vector_record = VectorRecord {
564 id: item.id,
565 offset: vector_offset,
566 dimensions,
567 deleted: false,
568 };
569 let vector_record_bytes = bincode::serialize(&vector_record)?;
570 db.put_cf_opt(vector_index_cf, id_bytes, vector_record_bytes, &write_opts)?;
571 let db_time = start.elapsed();
572
573 let total_items = {
575 let mut manifest_guard = self.manifest.write().await;
576 if let Some(ref mut manifest) = *manifest_guard {
577 manifest.total_items += 1;
578 manifest.total_items
579 } else {
580 return Err(VectraError::StorageError {
581 message: "Manifest not initialized".to_string()
582 });
583 }
584 };
585
586 self.mark_manifest_dirty().await?;
588
589 if total_items == 50 || total_items == 100 || total_items == 200 || total_items == 500 || total_items == 1000 {
591 let total_time = start_total.elapsed();
592 println!(" Storage: After {} items, insert took {} µs (offset: {} µs, mmap: {} µs, db: {} µs, rest: {} µs)",
593 total_items,
594 total_time.as_micros(),
595 offset_time.as_micros(),
596 mmap_time.as_micros(),
597 db_time.as_micros(),
598 total_time.as_micros() - offset_time.as_micros() - mmap_time.as_micros() - db_time.as_micros());
599 }
600 }
601
602 Ok(())
603 }
604
605 async fn insert_items(&mut self, items: &[VectorItem]) -> Result<()> {
606 if items.is_empty() {
607 return Ok(());
608 }
609
610 if self.db.read().await.is_none() {
612 self.initialize_storage().await?;
613 }
614
615 let first_dimensions = items[0].vector.len();
617 for item in items {
618 if item.vector.len() != first_dimensions {
619 return Err(VectraError::VectorValidation {
620 message: format!("All vectors must have same dimensions. Expected {}, got {}", first_dimensions, item.vector.len())
621 });
622 }
623 }
624
625 let mut needs_manifest_update = false;
627 {
628 let mut dims_guard = self.dimensions.write().await;
629 if dims_guard.is_none() {
630 *dims_guard = Some(first_dimensions);
631 needs_manifest_update = true;
632 } else if let Some(existing_dims) = *dims_guard {
633 if existing_dims != first_dimensions {
634 return Err(VectraError::VectorValidation {
635 message: format!("Vector dimension mismatch: expected {}, got {}", existing_dims, first_dimensions)
636 });
637 }
638 }
639 }
640
641 if needs_manifest_update {
643 let mut manifest_guard = self.manifest.write().await;
644 if let Some(ref mut manifest) = *manifest_guard {
645 manifest.dimensions = Some(first_dimensions);
646 self.save_manifest_to_disk(manifest).await?;
647 }
648 }
649
650 let record_size = VECTOR_HEADER_SIZE + (first_dimensions * 4);
652 let total_space_needed = items.len() * record_size;
653
654 {
656 let manifest = self.manifest.read().await;
657 if let Some(ref m) = *manifest {
658 let current_offset = m.next_vector_offset;
659 let final_size = current_offset + total_space_needed as u64;
660 drop(manifest); if items.len() >= 2500 {
664 println!("DEBUG: Batch size: {}, Current offset: {}, Final size needed: {}",
665 items.len(), current_offset, final_size);
666 }
667
668 self.ensure_vector_file_capacity(final_size).await?;
669 }
670 }
671
672 let record_size = VECTOR_HEADER_SIZE + (first_dimensions * 4);
674 let mut offsets = Vec::with_capacity(items.len());
675 {
676 let mut manifest_guard = self.manifest.write().await;
677 if let Some(ref mut manifest) = *manifest_guard {
678 let mut current_offset = manifest.next_vector_offset;
679 for _ in 0..items.len() {
680 offsets.push(current_offset);
681 current_offset += record_size as u64;
682 }
683 manifest.next_vector_offset = current_offset;
684 manifest.vector_file_size = current_offset;
685 } else {
686 return Err(VectraError::StorageError {
687 message: "Manifest not initialized".to_string()
688 });
689 }
690 }
691
692 let mut prepared_data = Vec::with_capacity(items.len());
694 for (idx, item) in items.iter().enumerate() {
695 let vector_offset = offsets[idx];
696 self.write_vector_to_file(&item.vector, vector_offset).await?;
697
698 let mut metadata_item = item.clone();
700 metadata_item.vector = Vec::new();
701 let metadata_bytes = serde_json::to_vec(&metadata_item)?;
702
703 let vector_record = VectorRecord {
705 id: item.id,
706 offset: vector_offset,
707 dimensions: first_dimensions,
708 deleted: false,
709 };
710 let vector_record_bytes = bincode::serialize(&vector_record)?;
711
712 prepared_data.push((item.id.as_bytes().to_vec(), metadata_bytes, vector_record_bytes));
713 }
714
715 let total_items_added = prepared_data.len();
717 {
718 let db_guard = self.db.read().await;
719 if let Some(ref db) = *db_guard {
720 let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
721 let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
722
723 let mut batch = rocksdb::WriteBatch::default();
725
726 for (id_bytes, metadata_bytes, vector_record_bytes) in prepared_data {
727 batch.put_cf(metadata_cf, &id_bytes, metadata_bytes);
728 batch.put_cf(vector_index_cf, &id_bytes, vector_record_bytes);
729 }
730
731 db.write(batch)?;
733 }
734 }
735
736 {
738 let mut manifest_guard = self.manifest.write().await;
739 if let Some(ref mut manifest) = *manifest_guard {
740 manifest.total_items += total_items_added;
741 }
742 }
743
744 self.mark_manifest_dirty().await?;
746
747 Ok(())
748 }
749
750 async fn update_item(&mut self, item: &VectorItem) -> Result<()> {
751 self.delete_item(&item.id).await?;
753 self.insert_item(item).await?;
754 Ok(())
755 }
756
757 async fn delete_item(&mut self, id: &Uuid) -> Result<()> {
758 let db_guard = self.db.read().await;
759 if let Some(ref db) = *db_guard {
760 let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
761 let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
762
763 let id_bytes = id.as_bytes();
764
765 if let Some(vector_record_bytes) = db.get_cf(vector_index_cf, id_bytes)? {
767 let mut vector_record: VectorRecord = bincode::deserialize(&vector_record_bytes)?;
768 vector_record.deleted = true;
769 let updated_bytes = bincode::serialize(&vector_record)?;
770 db.put_cf(vector_index_cf, id_bytes, updated_bytes)?;
771 }
772
773 db.delete_cf(metadata_cf, id_bytes)?;
775
776 let should_mark_dirty = {
778 let mut manifest_guard = self.manifest.write().await;
779 if let Some(ref mut manifest) = *manifest_guard {
780 if manifest.total_items > 0 {
781 manifest.total_items -= 1;
782 true
783 } else {
784 false
785 }
786 } else {
787 false
788 }
789 };
790
791 if should_mark_dirty {
793 self.mark_manifest_dirty().await?;
794 }
795 }
796
797 Ok(())
798 }
799
800 async fn list_items(&self, options: Option<ListOptions>) -> Result<Vec<VectorItem>> {
801 if self.db.read().await.is_none() {
803 self.initialize_storage().await?;
804 }
805
806 let metadata_records = {
808 let db_guard = self.db.read().await;
809 if let Some(ref db) = *db_guard {
810 let metadata_cf = db.cf_handle(METADATA_CF).unwrap();
811 let vector_index_cf = db.cf_handle(VECTOR_INDEX_CF).unwrap();
812
813 let mut records = Vec::new();
814 let iter = db.iterator_cf(metadata_cf, rocksdb::IteratorMode::Start);
815
816 for item in iter {
817 let (key, value) = item?;
818
819 if let Some(vector_record_bytes) = db.get_cf(vector_index_cf, &key)? {
821 let vector_record: VectorRecord = bincode::deserialize(&vector_record_bytes)?;
822
823 if !vector_record.deleted {
824 let metadata_item: VectorItem = serde_json::from_slice(&value)?;
825 records.push((metadata_item, vector_record));
826
827 if let Some(ref opts) = options {
829 if let Some(limit) = opts.limit {
830 if records.len() >= limit {
831 break;
832 }
833 }
834 }
835 }
836 }
837 }
838
839 records
840 } else {
841 Vec::new()
842 }
843 };
844
845 let mut items = Vec::new();
847 for (mut metadata_item, vector_record) in metadata_records {
848 metadata_item.vector = self.read_vector_from_file(vector_record.offset, vector_record.dimensions).await?;
850 items.push(metadata_item);
851 }
852
853 Ok(items)
854 }
855
856 async fn query_items(&self, query: &Query) -> Result<Vec<QueryResult>> {
857 if let Some(ref query_vector) = query.vector {
858 let all_items = self.list_items(None).await?;
861 let mut results = Vec::new();
862
863 for item in all_items {
864 if item.vector.len() == query_vector.len() {
865 let similarity = VectorOps::cosine_similarity(query_vector, &item.vector);
866 results.push(QueryResult {
867 item,
868 score: similarity,
869 });
870 }
871 }
872
873 results.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
875
876 results.truncate(query.top_k);
878
879 Ok(results)
880 } else {
881 Ok(Vec::new())
882 }
883 }
884
885 async fn begin_transaction(&mut self) -> Result<()> {
886 Ok(())
889 }
890
891 async fn commit_transaction(&mut self) -> Result<()> {
892 self.flush_manifest_if_dirty().await?;
894
895 let db_guard = self.db.read().await;
897 if let Some(ref db) = *db_guard {
898 db.flush()?;
899 }
900
901 if let Some(ref mut mmap_guard) = *self.vector_mmap.write().await {
902 mmap_guard.flush()?;
903 }
904
905 Ok(())
906 }
907
908 async fn rollback_transaction(&mut self) -> Result<()> {
909 Ok(())
912 }
913
914 async fn delete_index(&mut self) -> Result<()> {
915 *self.db.write().await = None;
917 *self.vector_file.write().await = None;
918 *self.vector_mmap.write().await = None;
919 *self.manifest.write().await = None;
920
921 if self.path.exists() {
923 fs::remove_dir_all(&self.path).await?;
924 }
925 Ok(())
926 }
927
928 async fn get_stats(&self) -> Result<IndexStats> {
929 if let Some(manifest) = self.load_manifest().await? {
930 let size = if self.path.exists() {
931 let mut total_size = 0u64;
933 let mut entries = fs::read_dir(&self.path).await?;
934 while let Some(entry) = entries.next_entry().await? {
935 if let Ok(metadata) = entry.metadata().await {
936 total_size += metadata.len();
937 }
938 }
939 total_size
940 } else {
941 0
942 };
943
944 Ok(IndexStats {
945 items: manifest.total_items,
946 size,
947 dimensions: manifest.dimensions,
948 distance_metric: manifest.distance_metric,
949 })
950 } else {
951 Ok(IndexStats {
952 items: 0,
953 size: 0,
954 dimensions: None,
955 distance_metric: DistanceMetric::Cosine,
956 })
957 }
958 }
959}
960
961#[cfg(test)]
962mod tests {
963 use super::*;
964 use vectrust_core::StorageBackend;
965 use tempfile::TempDir;
966
967 #[tokio::test]
968 async fn test_optimized_storage_creation() {
969 let temp_dir = TempDir::new().unwrap();
970 let mut storage = OptimizedStorage::new(temp_dir.path()).unwrap();
971
972 assert!(!storage.exists().await);
973
974 let config = CreateIndexConfig::default();
975 storage.create_index(&config).await.unwrap();
976
977 assert!(storage.exists().await);
978 }
979
980 #[tokio::test]
981 async fn test_optimized_storage_insert_and_get() {
982 let temp_dir = TempDir::new().unwrap();
983 let mut storage = OptimizedStorage::new(temp_dir.path()).unwrap();
984
985 let config = CreateIndexConfig::default();
986 storage.create_index(&config).await.unwrap();
987
988 let item = VectorItem {
989 id: Uuid::new_v4(),
990 vector: vec![1.0, 0.0, 0.0],
991 metadata: serde_json::json!({"test": "data"}),
992 ..Default::default()
993 };
994
995 storage.insert_item(&item).await.unwrap();
996
997 let retrieved = storage.get_item(&item.id).await.unwrap();
998 assert!(retrieved.is_some());
999
1000 let retrieved_item = retrieved.unwrap();
1001 assert_eq!(retrieved_item.id, item.id);
1002 assert_eq!(retrieved_item.vector, item.vector);
1003 }
1004
1005 #[tokio::test]
1006 async fn test_optimized_storage_query() {
1007 let temp_dir = TempDir::new().unwrap();
1008 let mut storage = OptimizedStorage::new(temp_dir.path()).unwrap();
1009
1010 let config = CreateIndexConfig::default();
1011 storage.create_index(&config).await.unwrap();
1012
1013 let item1 = VectorItem {
1014 id: Uuid::new_v4(),
1015 vector: vec![1.0, 0.0, 0.0],
1016 metadata: serde_json::json!({"name": "item1"}),
1017 ..Default::default()
1018 };
1019
1020 let item2 = VectorItem {
1021 id: Uuid::new_v4(),
1022 vector: vec![0.0, 1.0, 0.0],
1023 metadata: serde_json::json!({"name": "item2"}),
1024 ..Default::default()
1025 };
1026
1027 storage.insert_item(&item1).await.unwrap();
1028 storage.insert_item(&item2).await.unwrap();
1029
1030 let query = Query {
1031 vector: Some(vec![1.0, 0.1, 0.0]),
1032 text: None,
1033 top_k: 2,
1034 filter: None,
1035 };
1036
1037 let results = storage.query_items(&query).await.unwrap();
1038 assert_eq!(results.len(), 2);
1039
1040 assert_eq!(results[0].item.id, item1.id);
1042 assert!(results[0].score > results[1].score);
1043 }
1044}