oxirs_vec/diskann/
storage.rs

1//! Storage backends for DiskANN
2//!
3//! Provides abstractions for storing vectors and graph structures on disk
4//! with support for memory-mapped I/O and buffered access.
5//!
6//! ## Storage Layout
7//! - Vectors: Raw f32 arrays or PQ-compressed codes
8//! - Graph: Adjacency lists with neighbor IDs
9//! - Metadata: Index configuration and statistics
10//!
11//! ## Backends
12//! - **DiskStorage**: Standard file I/O with buffering
13//! - **MemoryMappedStorage**: Memory-mapped files for fast access
14//! - **CachedStorage**: Hybrid with LRU caching
15
16use crate::diskann::config::DiskAnnConfig;
17use crate::diskann::graph::VamanaGraph;
18use crate::diskann::types::{DiskAnnError, DiskAnnResult, VectorId};
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::fs::{File, OpenOptions};
22use std::io::{BufReader, BufWriter, Read, Write};
23use std::path::{Path, PathBuf};
24
25/// Storage backend trait
26pub trait StorageBackend: Send + Sync {
27    /// Write a vector to storage
28    fn write_vector(&mut self, vector_id: &VectorId, vector: &[f32]) -> DiskAnnResult<()>;
29
30    /// Read a vector from storage
31    fn read_vector(&self, vector_id: &VectorId) -> DiskAnnResult<Vec<f32>>;
32
33    /// Write graph structure
34    fn write_graph(&mut self, graph: &VamanaGraph) -> DiskAnnResult<()>;
35
36    /// Read graph structure
37    fn read_graph(&self) -> DiskAnnResult<VamanaGraph>;
38
39    /// Write metadata
40    fn write_metadata(&mut self, metadata: &StorageMetadata) -> DiskAnnResult<()>;
41
42    /// Read metadata
43    fn read_metadata(&self) -> DiskAnnResult<StorageMetadata>;
44
45    /// Delete all data
46    fn clear(&mut self) -> DiskAnnResult<()>;
47
48    /// Flush any pending writes
49    fn flush(&mut self) -> DiskAnnResult<()>;
50
51    /// Get storage size in bytes
52    fn size(&self) -> DiskAnnResult<u64>;
53}
54
55/// Storage metadata
56#[derive(Debug, Clone, Serialize, Deserialize, bincode::Encode, bincode::Decode)]
57pub struct StorageMetadata {
58    pub version: String,
59    pub config: DiskAnnConfig,
60    pub num_vectors: usize,
61    #[bincode(with_serde)]
62    pub created_at: chrono::DateTime<chrono::Utc>,
63    #[bincode(with_serde)]
64    pub updated_at: chrono::DateTime<chrono::Utc>,
65}
66
67impl StorageMetadata {
68    pub fn new(config: DiskAnnConfig) -> Self {
69        let now = chrono::Utc::now();
70        Self {
71            version: env!("CARGO_PKG_VERSION").to_string(),
72            config,
73            num_vectors: 0,
74            created_at: now,
75            updated_at: now,
76        }
77    }
78
79    pub fn update_timestamp(&mut self) {
80        self.updated_at = chrono::Utc::now();
81    }
82}
83
84/// Standard disk storage with buffered I/O
85#[derive(Debug)]
86pub struct DiskStorage {
87    base_path: PathBuf,
88    vector_file: Option<PathBuf>,
89    graph_file: Option<PathBuf>,
90    metadata_file: Option<PathBuf>,
91    dimension: usize,
92    vector_cache: HashMap<VectorId, Vec<f32>>,
93    cache_limit: usize,
94}
95
96impl DiskStorage {
97    /// Create new disk storage at given path
98    pub fn new<P: AsRef<Path>>(base_path: P, dimension: usize) -> DiskAnnResult<Self> {
99        let base_path = base_path.as_ref().to_path_buf();
100
101        // Create directory if it doesn't exist
102        if !base_path.exists() {
103            std::fs::create_dir_all(&base_path).map_err(|e| DiskAnnError::IoError {
104                message: format!("Failed to create directory: {}", e),
105            })?;
106        }
107
108        let vector_file = Some(base_path.join("vectors.bin"));
109        let graph_file = Some(base_path.join("graph.bin"));
110        let metadata_file = Some(base_path.join("metadata.json"));
111
112        Ok(Self {
113            base_path,
114            vector_file,
115            graph_file,
116            metadata_file,
117            dimension,
118            vector_cache: HashMap::new(),
119            cache_limit: 1000,
120        })
121    }
122
123    /// Set cache limit (number of vectors to keep in memory)
124    pub fn with_cache_limit(mut self, limit: usize) -> Self {
125        self.cache_limit = limit;
126        self
127    }
128
129    /// Get vector file path
130    pub fn vector_file_path(&self) -> &Option<PathBuf> {
131        &self.vector_file
132    }
133
134    /// Get graph file path
135    pub fn graph_file_path(&self) -> &Option<PathBuf> {
136        &self.graph_file
137    }
138
139    /// Evict old entries from cache if needed
140    fn evict_cache_if_needed(&mut self) {
141        if self.vector_cache.len() > self.cache_limit {
142            // Simple eviction: remove first entry
143            if let Some(key) = self.vector_cache.keys().next().cloned() {
144                self.vector_cache.remove(&key);
145            }
146        }
147    }
148}
149
150impl Clone for DiskStorage {
151    fn clone(&self) -> Self {
152        Self {
153            base_path: self.base_path.clone(),
154            vector_file: self.vector_file.clone(),
155            graph_file: self.graph_file.clone(),
156            metadata_file: self.metadata_file.clone(),
157            dimension: self.dimension,
158            vector_cache: HashMap::new(), // Don't clone cache
159            cache_limit: self.cache_limit,
160        }
161    }
162}
163
164impl StorageBackend for DiskStorage {
165    fn write_vector(&mut self, vector_id: &VectorId, vector: &[f32]) -> DiskAnnResult<()> {
166        if vector.len() != self.dimension {
167            return Err(DiskAnnError::DimensionMismatch {
168                expected: self.dimension,
169                actual: vector.len(),
170            });
171        }
172
173        // Add to cache
174        self.vector_cache.insert(vector_id.clone(), vector.to_vec());
175        self.evict_cache_if_needed();
176
177        // Append to vector file
178        if let Some(path) = &self.vector_file {
179            let file = OpenOptions::new()
180                .create(true)
181                .append(true)
182                .open(path)
183                .map_err(|e| DiskAnnError::IoError {
184                    message: format!("Failed to open vector file: {}", e),
185                })?;
186
187            let mut writer = BufWriter::new(file);
188
189            // Write vector ID length and ID
190            let id_bytes = vector_id.as_bytes();
191            writer
192                .write_all(&(id_bytes.len() as u32).to_le_bytes())
193                .map_err(|e| DiskAnnError::IoError {
194                    message: format!("Failed to write vector ID length: {}", e),
195                })?;
196            writer
197                .write_all(id_bytes)
198                .map_err(|e| DiskAnnError::IoError {
199                    message: format!("Failed to write vector ID: {}", e),
200                })?;
201
202            // Write vector data
203            for &value in vector {
204                writer
205                    .write_all(&value.to_le_bytes())
206                    .map_err(|e| DiskAnnError::IoError {
207                        message: format!("Failed to write vector data: {}", e),
208                    })?;
209            }
210
211            writer.flush().map_err(|e| DiskAnnError::IoError {
212                message: format!("Failed to flush vector file: {}", e),
213            })?;
214        }
215
216        Ok(())
217    }
218
219    fn read_vector(&self, vector_id: &VectorId) -> DiskAnnResult<Vec<f32>> {
220        // Check cache first
221        if let Some(vector) = self.vector_cache.get(vector_id) {
222            return Ok(vector.clone());
223        }
224
225        // Read from disk
226        if let Some(path) = &self.vector_file {
227            if !path.exists() {
228                return Err(DiskAnnError::VectorNotFound {
229                    id: vector_id.clone(),
230                });
231            }
232
233            let file = File::open(path).map_err(|e| DiskAnnError::IoError {
234                message: format!("Failed to open vector file: {}", e),
235            })?;
236            let mut reader = BufReader::new(file);
237
238            // Sequential scan (inefficient, but simple for now)
239            loop {
240                // Read ID length
241                let mut id_len_bytes = [0u8; 4];
242                if reader.read_exact(&mut id_len_bytes).is_err() {
243                    break; // End of file
244                }
245                let id_len = u32::from_le_bytes(id_len_bytes) as usize;
246
247                // Read ID
248                let mut id_bytes = vec![0u8; id_len];
249                reader
250                    .read_exact(&mut id_bytes)
251                    .map_err(|e| DiskAnnError::IoError {
252                        message: format!("Failed to read vector ID: {}", e),
253                    })?;
254                let id = String::from_utf8(id_bytes).map_err(|e| DiskAnnError::IoError {
255                    message: format!("Invalid UTF-8 in vector ID: {}", e),
256                })?;
257
258                // Read vector data
259                let mut vector = vec![0.0f32; self.dimension];
260                for value in &mut vector {
261                    let mut bytes = [0u8; 4];
262                    reader
263                        .read_exact(&mut bytes)
264                        .map_err(|e| DiskAnnError::IoError {
265                            message: format!("Failed to read vector data: {}", e),
266                        })?;
267                    *value = f32::from_le_bytes(bytes);
268                }
269
270                if &id == vector_id {
271                    return Ok(vector);
272                }
273            }
274
275            Err(DiskAnnError::VectorNotFound {
276                id: vector_id.clone(),
277            })
278        } else {
279            Err(DiskAnnError::VectorNotFound {
280                id: vector_id.clone(),
281            })
282        }
283    }
284
285    fn write_graph(&mut self, graph: &VamanaGraph) -> DiskAnnResult<()> {
286        if let Some(path) = &self.graph_file {
287            let file = File::create(path).map_err(|e| DiskAnnError::IoError {
288                message: format!("Failed to create graph file: {}", e),
289            })?;
290
291            let mut writer = BufWriter::new(file);
292            bincode::encode_into_std_write(graph, &mut writer, bincode::config::standard())?;
293        }
294        Ok(())
295    }
296
297    fn read_graph(&self) -> DiskAnnResult<VamanaGraph> {
298        if let Some(path) = &self.graph_file {
299            if !path.exists() {
300                return Err(DiskAnnError::StorageError {
301                    message: "Graph file does not exist".to_string(),
302                });
303            }
304
305            let file = File::open(path).map_err(|e| DiskAnnError::IoError {
306                message: format!("Failed to open graph file: {}", e),
307            })?;
308
309            let mut reader = BufReader::new(file);
310            let graph = bincode::decode_from_std_read(&mut reader, bincode::config::standard())?;
311            Ok(graph)
312        } else {
313            Err(DiskAnnError::StorageError {
314                message: "Graph file path not set".to_string(),
315            })
316        }
317    }
318
319    fn write_metadata(&mut self, metadata: &StorageMetadata) -> DiskAnnResult<()> {
320        if let Some(path) = &self.metadata_file {
321            let mut file = File::create(path).map_err(|e| DiskAnnError::IoError {
322                message: format!("Failed to create metadata file: {}", e),
323            })?;
324
325            serde_json::to_writer_pretty(&mut file, metadata).map_err(|e| {
326                DiskAnnError::SerializationError {
327                    message: format!("Failed to serialize metadata: {}", e),
328                }
329            })?;
330
331            // Explicitly sync to disk
332            file.sync_all().map_err(|e| DiskAnnError::IoError {
333                message: format!("Failed to sync metadata file: {}", e),
334            })?;
335        }
336        Ok(())
337    }
338
339    fn read_metadata(&self) -> DiskAnnResult<StorageMetadata> {
340        if let Some(path) = &self.metadata_file {
341            if !path.exists() {
342                return Err(DiskAnnError::StorageError {
343                    message: "Metadata file does not exist".to_string(),
344                });
345            }
346
347            let file = File::open(path).map_err(|e| DiskAnnError::IoError {
348                message: format!("Failed to open metadata file: {}", e),
349            })?;
350
351            let metadata =
352                serde_json::from_reader(file).map_err(|e| DiskAnnError::SerializationError {
353                    message: format!("Failed to deserialize metadata: {}", e),
354                })?;
355
356            Ok(metadata)
357        } else {
358            Err(DiskAnnError::StorageError {
359                message: "Metadata file path not set".to_string(),
360            })
361        }
362    }
363
364    fn clear(&mut self) -> DiskAnnResult<()> {
365        self.vector_cache.clear();
366
367        if let Some(path) = &self.vector_file {
368            if path.exists() {
369                std::fs::remove_file(path).map_err(|e| DiskAnnError::IoError {
370                    message: format!("Failed to remove vector file: {}", e),
371                })?;
372            }
373        }
374
375        if let Some(path) = &self.graph_file {
376            if path.exists() {
377                std::fs::remove_file(path).map_err(|e| DiskAnnError::IoError {
378                    message: format!("Failed to remove graph file: {}", e),
379                })?;
380            }
381        }
382
383        if let Some(path) = &self.metadata_file {
384            if path.exists() {
385                std::fs::remove_file(path).map_err(|e| DiskAnnError::IoError {
386                    message: format!("Failed to remove metadata file: {}", e),
387                })?;
388            }
389        }
390
391        Ok(())
392    }
393
394    fn flush(&mut self) -> DiskAnnResult<()> {
395        // All writes are immediately flushed in this implementation
396        Ok(())
397    }
398
399    fn size(&self) -> DiskAnnResult<u64> {
400        let mut total_size = 0u64;
401
402        if let Some(path) = &self.vector_file {
403            if path.exists() {
404                total_size += std::fs::metadata(path)
405                    .map_err(|e| DiskAnnError::IoError {
406                        message: format!("Failed to get vector file size: {}", e),
407                    })?
408                    .len();
409            }
410        }
411
412        if let Some(path) = &self.graph_file {
413            if path.exists() {
414                total_size += std::fs::metadata(path)
415                    .map_err(|e| DiskAnnError::IoError {
416                        message: format!("Failed to get graph file size: {}", e),
417                    })?
418                    .len();
419            }
420        }
421
422        if let Some(path) = &self.metadata_file {
423            if path.exists() {
424                total_size += std::fs::metadata(path)
425                    .map_err(|e| DiskAnnError::IoError {
426                        message: format!("Failed to get metadata file size: {}", e),
427                    })?
428                    .len();
429            }
430        }
431
432        Ok(total_size)
433    }
434}
435
436/// Memory-mapped storage (stub for future implementation)
437#[derive(Debug, Clone, Serialize, Deserialize)]
438pub struct MemoryMappedStorage {
439    base_path: PathBuf,
440    dimension: usize,
441}
442
443impl MemoryMappedStorage {
444    pub fn new<P: AsRef<Path>>(base_path: P, dimension: usize) -> DiskAnnResult<Self> {
445        Ok(Self {
446            base_path: base_path.as_ref().to_path_buf(),
447            dimension,
448        })
449    }
450}
451
452impl StorageBackend for MemoryMappedStorage {
453    fn write_vector(&mut self, _vector_id: &VectorId, _vector: &[f32]) -> DiskAnnResult<()> {
454        Err(DiskAnnError::StorageError {
455            message: "MemoryMappedStorage not yet implemented".to_string(),
456        })
457    }
458
459    fn read_vector(&self, _vector_id: &VectorId) -> DiskAnnResult<Vec<f32>> {
460        Err(DiskAnnError::StorageError {
461            message: "MemoryMappedStorage not yet implemented".to_string(),
462        })
463    }
464
465    fn write_graph(&mut self, _graph: &VamanaGraph) -> DiskAnnResult<()> {
466        Err(DiskAnnError::StorageError {
467            message: "MemoryMappedStorage not yet implemented".to_string(),
468        })
469    }
470
471    fn read_graph(&self) -> DiskAnnResult<VamanaGraph> {
472        Err(DiskAnnError::StorageError {
473            message: "MemoryMappedStorage not yet implemented".to_string(),
474        })
475    }
476
477    fn write_metadata(&mut self, _metadata: &StorageMetadata) -> DiskAnnResult<()> {
478        Err(DiskAnnError::StorageError {
479            message: "MemoryMappedStorage not yet implemented".to_string(),
480        })
481    }
482
483    fn read_metadata(&self) -> DiskAnnResult<StorageMetadata> {
484        Err(DiskAnnError::StorageError {
485            message: "MemoryMappedStorage not yet implemented".to_string(),
486        })
487    }
488
489    fn clear(&mut self) -> DiskAnnResult<()> {
490        Ok(())
491    }
492
493    fn flush(&mut self) -> DiskAnnResult<()> {
494        Ok(())
495    }
496
497    fn size(&self) -> DiskAnnResult<u64> {
498        Ok(0)
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505    use crate::diskann::config::PruningStrategy;
506    use std::env;
507
508    fn temp_dir() -> PathBuf {
509        use std::sync::atomic::{AtomicU64, Ordering};
510        static COUNTER: AtomicU64 = AtomicU64::new(0);
511        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
512        env::temp_dir().join(format!(
513            "diskann_storage_test_{}_{}",
514            chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
515            id
516        ))
517    }
518
519    #[test]
520    fn test_disk_storage_vector_write_read() {
521        let dir = temp_dir();
522        let mut storage = DiskStorage::new(&dir, 3).unwrap();
523
524        let vector = vec![1.0, 2.0, 3.0];
525        storage.write_vector(&"vec1".to_string(), &vector).unwrap();
526
527        let read_vector = storage.read_vector(&"vec1".to_string()).unwrap();
528        assert_eq!(read_vector, vector);
529
530        storage.clear().unwrap();
531    }
532
533    #[test]
534    fn test_disk_storage_dimension_mismatch() {
535        let dir = temp_dir();
536        let mut storage = DiskStorage::new(&dir, 3).unwrap();
537
538        let vector = vec![1.0, 2.0]; // Wrong dimension
539        let result = storage.write_vector(&"vec1".to_string(), &vector);
540
541        assert!(result.is_err());
542        storage.clear().unwrap();
543    }
544
545    #[test]
546    fn test_disk_storage_graph() {
547        let dir = temp_dir();
548        std::fs::remove_dir_all(&dir).ok(); // Clean up if exists
549        let mut storage = DiskStorage::new(&dir, 3).unwrap();
550
551        let mut graph = VamanaGraph::new(3, PruningStrategy::Alpha, 1.2);
552        graph.add_node("v1".to_string()).unwrap();
553        graph.add_node("v2".to_string()).unwrap();
554
555        storage.write_graph(&graph).unwrap();
556        let read_graph = storage.read_graph().unwrap();
557
558        assert_eq!(read_graph.num_nodes(), 2);
559        storage.clear().unwrap();
560        std::fs::remove_dir_all(&dir).ok();
561    }
562
563    #[test]
564    fn test_disk_storage_metadata() {
565        let dir = temp_dir();
566        std::fs::remove_dir_all(&dir).ok(); // Clean up if exists
567        let mut storage = DiskStorage::new(&dir, 128).unwrap();
568
569        let config = DiskAnnConfig::default_config(128);
570        let metadata = StorageMetadata::new(config);
571
572        storage.write_metadata(&metadata).unwrap();
573        let read_metadata = storage.read_metadata().unwrap();
574
575        assert_eq!(read_metadata.config.dimension, 128);
576        storage.clear().unwrap();
577        std::fs::remove_dir_all(&dir).ok();
578    }
579
580    #[test]
581    fn test_disk_storage_size() {
582        let dir = temp_dir();
583        let mut storage = DiskStorage::new(&dir, 3).unwrap();
584
585        let initial_size = storage.size().unwrap();
586        assert_eq!(initial_size, 0);
587
588        let vector = vec![1.0, 2.0, 3.0];
589        storage.write_vector(&"vec1".to_string(), &vector).unwrap();
590
591        let after_write = storage.size().unwrap();
592        assert!(after_write > initial_size);
593
594        storage.clear().unwrap();
595    }
596
597    #[test]
598    fn test_disk_storage_cache() {
599        let dir = temp_dir();
600        std::fs::remove_dir_all(&dir).ok(); // Clean up if exists
601        let mut storage = DiskStorage::new(&dir, 3).unwrap().with_cache_limit(2);
602
603        storage
604            .write_vector(&"v1".to_string(), &[1.0, 2.0, 3.0])
605            .unwrap();
606        storage
607            .write_vector(&"v2".to_string(), &[4.0, 5.0, 6.0])
608            .unwrap();
609        storage
610            .write_vector(&"v3".to_string(), &[7.0, 8.0, 9.0])
611            .unwrap();
612
613        // Cache should have at most 2 entries
614        assert!(storage.vector_cache.len() <= 2);
615
616        storage.clear().unwrap();
617        std::fs::remove_dir_all(&dir).ok();
618    }
619
620    #[test]
621    fn test_vector_not_found() {
622        let dir = temp_dir();
623        let storage = DiskStorage::new(&dir, 3).unwrap();
624
625        let result = storage.read_vector(&"nonexistent".to_string());
626        assert!(result.is_err());
627    }
628
629    #[test]
630    fn test_storage_clear() {
631        let dir = temp_dir();
632        std::fs::remove_dir_all(&dir).ok(); // Clean up if exists
633        let mut storage = DiskStorage::new(&dir, 3).unwrap();
634
635        storage
636            .write_vector(&"v1".to_string(), &[1.0, 2.0, 3.0])
637            .unwrap();
638
639        // Verify file was created
640        let vector_file = storage.vector_file.as_ref().unwrap().clone();
641        assert!(
642            vector_file.exists(),
643            "Vector file should exist after write: {:?}",
644            vector_file
645        );
646
647        storage.clear().unwrap();
648        assert!(
649            !vector_file.exists(),
650            "Vector file should not exist after clear: {:?}",
651            vector_file
652        );
653
654        // Cleanup
655        std::fs::remove_dir_all(&dir).ok();
656    }
657}