oxirs_vec/diskann/
storage.rs

1//! Storage backends for DiskANN
2//!
3//! Provides abstractions for storing vectors and graph structures on disk
4//! with support for memory-mapped I/O and buffered access.
5//!
6//! ## Storage Layout
7//! - Vectors: Raw f32 arrays or PQ-compressed codes
8//! - Graph: Adjacency lists with neighbor IDs
9//! - Metadata: Index configuration and statistics
10//!
11//! ## Backends
12//! - **DiskStorage**: Standard file I/O with buffering
13//! - **MemoryMappedStorage**: Memory-mapped files for fast access
14//! - **CachedStorage**: Hybrid with LRU caching
15
16use crate::diskann::config::DiskAnnConfig;
17use crate::diskann::graph::VamanaGraph;
18use crate::diskann::types::{DiskAnnError, DiskAnnResult, VectorId};
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::fs::{File, OpenOptions};
22use std::io::{BufReader, BufWriter, Read, Write};
23use std::path::{Path, PathBuf};
24
25/// Storage backend trait
26pub trait StorageBackend: Send + Sync {
27    /// Write a vector to storage
28    fn write_vector(&mut self, vector_id: &VectorId, vector: &[f32]) -> DiskAnnResult<()>;
29
30    /// Read a vector from storage
31    fn read_vector(&self, vector_id: &VectorId) -> DiskAnnResult<Vec<f32>>;
32
33    /// Write graph structure
34    fn write_graph(&mut self, graph: &VamanaGraph) -> DiskAnnResult<()>;
35
36    /// Read graph structure
37    fn read_graph(&self) -> DiskAnnResult<VamanaGraph>;
38
39    /// Write metadata
40    fn write_metadata(&mut self, metadata: &StorageMetadata) -> DiskAnnResult<()>;
41
42    /// Read metadata
43    fn read_metadata(&self) -> DiskAnnResult<StorageMetadata>;
44
45    /// Delete all data
46    fn clear(&mut self) -> DiskAnnResult<()>;
47
48    /// Flush any pending writes
49    fn flush(&mut self) -> DiskAnnResult<()>;
50
51    /// Get storage size in bytes
52    fn size(&self) -> DiskAnnResult<u64>;
53}
54
55/// Storage metadata
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct StorageMetadata {
58    pub version: String,
59    pub config: DiskAnnConfig,
60    pub num_vectors: usize,
61    pub created_at: chrono::DateTime<chrono::Utc>,
62    pub updated_at: chrono::DateTime<chrono::Utc>,
63}
64
65impl StorageMetadata {
66    pub fn new(config: DiskAnnConfig) -> Self {
67        let now = chrono::Utc::now();
68        Self {
69            version: env!("CARGO_PKG_VERSION").to_string(),
70            config,
71            num_vectors: 0,
72            created_at: now,
73            updated_at: now,
74        }
75    }
76
77    pub fn update_timestamp(&mut self) {
78        self.updated_at = chrono::Utc::now();
79    }
80}
81
82/// Standard disk storage with buffered I/O
83#[derive(Debug)]
84pub struct DiskStorage {
85    base_path: PathBuf,
86    vector_file: Option<PathBuf>,
87    graph_file: Option<PathBuf>,
88    metadata_file: Option<PathBuf>,
89    dimension: usize,
90    vector_cache: HashMap<VectorId, Vec<f32>>,
91    cache_limit: usize,
92}
93
94impl DiskStorage {
95    /// Create new disk storage at given path
96    pub fn new<P: AsRef<Path>>(base_path: P, dimension: usize) -> DiskAnnResult<Self> {
97        let base_path = base_path.as_ref().to_path_buf();
98
99        // Create directory if it doesn't exist
100        if !base_path.exists() {
101            std::fs::create_dir_all(&base_path).map_err(|e| DiskAnnError::IoError {
102                message: format!("Failed to create directory: {}", e),
103            })?;
104        }
105
106        let vector_file = Some(base_path.join("vectors.bin"));
107        let graph_file = Some(base_path.join("graph.bin"));
108        let metadata_file = Some(base_path.join("metadata.json"));
109
110        Ok(Self {
111            base_path,
112            vector_file,
113            graph_file,
114            metadata_file,
115            dimension,
116            vector_cache: HashMap::new(),
117            cache_limit: 1000,
118        })
119    }
120
121    /// Set cache limit (number of vectors to keep in memory)
122    pub fn with_cache_limit(mut self, limit: usize) -> Self {
123        self.cache_limit = limit;
124        self
125    }
126
127    /// Get vector file path
128    pub fn vector_file_path(&self) -> &Option<PathBuf> {
129        &self.vector_file
130    }
131
132    /// Get graph file path
133    pub fn graph_file_path(&self) -> &Option<PathBuf> {
134        &self.graph_file
135    }
136
137    /// Evict old entries from cache if needed
138    fn evict_cache_if_needed(&mut self) {
139        if self.vector_cache.len() > self.cache_limit {
140            // Simple eviction: remove first entry
141            if let Some(key) = self.vector_cache.keys().next().cloned() {
142                self.vector_cache.remove(&key);
143            }
144        }
145    }
146}
147
148impl Clone for DiskStorage {
149    fn clone(&self) -> Self {
150        Self {
151            base_path: self.base_path.clone(),
152            vector_file: self.vector_file.clone(),
153            graph_file: self.graph_file.clone(),
154            metadata_file: self.metadata_file.clone(),
155            dimension: self.dimension,
156            vector_cache: HashMap::new(), // Don't clone cache
157            cache_limit: self.cache_limit,
158        }
159    }
160}
161
162impl StorageBackend for DiskStorage {
163    fn write_vector(&mut self, vector_id: &VectorId, vector: &[f32]) -> DiskAnnResult<()> {
164        if vector.len() != self.dimension {
165            return Err(DiskAnnError::DimensionMismatch {
166                expected: self.dimension,
167                actual: vector.len(),
168            });
169        }
170
171        // Add to cache
172        self.vector_cache.insert(vector_id.clone(), vector.to_vec());
173        self.evict_cache_if_needed();
174
175        // Append to vector file
176        if let Some(path) = &self.vector_file {
177            let file = OpenOptions::new()
178                .create(true)
179                .append(true)
180                .open(path)
181                .map_err(|e| DiskAnnError::IoError {
182                    message: format!("Failed to open vector file: {}", e),
183                })?;
184
185            let mut writer = BufWriter::new(file);
186
187            // Write vector ID length and ID
188            let id_bytes = vector_id.as_bytes();
189            writer
190                .write_all(&(id_bytes.len() as u32).to_le_bytes())
191                .map_err(|e| DiskAnnError::IoError {
192                    message: format!("Failed to write vector ID length: {}", e),
193                })?;
194            writer
195                .write_all(id_bytes)
196                .map_err(|e| DiskAnnError::IoError {
197                    message: format!("Failed to write vector ID: {}", e),
198                })?;
199
200            // Write vector data
201            for &value in vector {
202                writer
203                    .write_all(&value.to_le_bytes())
204                    .map_err(|e| DiskAnnError::IoError {
205                        message: format!("Failed to write vector data: {}", e),
206                    })?;
207            }
208
209            writer.flush().map_err(|e| DiskAnnError::IoError {
210                message: format!("Failed to flush vector file: {}", e),
211            })?;
212        }
213
214        Ok(())
215    }
216
217    fn read_vector(&self, vector_id: &VectorId) -> DiskAnnResult<Vec<f32>> {
218        // Check cache first
219        if let Some(vector) = self.vector_cache.get(vector_id) {
220            return Ok(vector.clone());
221        }
222
223        // Read from disk
224        if let Some(path) = &self.vector_file {
225            if !path.exists() {
226                return Err(DiskAnnError::VectorNotFound {
227                    id: vector_id.clone(),
228                });
229            }
230
231            let file = File::open(path).map_err(|e| DiskAnnError::IoError {
232                message: format!("Failed to open vector file: {}", e),
233            })?;
234            let mut reader = BufReader::new(file);
235
236            // Sequential scan (inefficient, but simple for now)
237            loop {
238                // Read ID length
239                let mut id_len_bytes = [0u8; 4];
240                if reader.read_exact(&mut id_len_bytes).is_err() {
241                    break; // End of file
242                }
243                let id_len = u32::from_le_bytes(id_len_bytes) as usize;
244
245                // Read ID
246                let mut id_bytes = vec![0u8; id_len];
247                reader
248                    .read_exact(&mut id_bytes)
249                    .map_err(|e| DiskAnnError::IoError {
250                        message: format!("Failed to read vector ID: {}", e),
251                    })?;
252                let id = String::from_utf8(id_bytes).map_err(|e| DiskAnnError::IoError {
253                    message: format!("Invalid UTF-8 in vector ID: {}", e),
254                })?;
255
256                // Read vector data
257                let mut vector = vec![0.0f32; self.dimension];
258                for value in &mut vector {
259                    let mut bytes = [0u8; 4];
260                    reader
261                        .read_exact(&mut bytes)
262                        .map_err(|e| DiskAnnError::IoError {
263                            message: format!("Failed to read vector data: {}", e),
264                        })?;
265                    *value = f32::from_le_bytes(bytes);
266                }
267
268                if &id == vector_id {
269                    return Ok(vector);
270                }
271            }
272
273            Err(DiskAnnError::VectorNotFound {
274                id: vector_id.clone(),
275            })
276        } else {
277            Err(DiskAnnError::VectorNotFound {
278                id: vector_id.clone(),
279            })
280        }
281    }
282
283    fn write_graph(&mut self, graph: &VamanaGraph) -> DiskAnnResult<()> {
284        if let Some(path) = &self.graph_file {
285            let file = File::create(path).map_err(|e| DiskAnnError::IoError {
286                message: format!("Failed to create graph file: {}", e),
287            })?;
288
289            let mut writer = BufWriter::new(file);
290            oxicode::serde::encode_into_std_write(graph, &mut writer, oxicode::config::standard())?;
291        }
292        Ok(())
293    }
294
295    fn read_graph(&self) -> DiskAnnResult<VamanaGraph> {
296        if let Some(path) = &self.graph_file {
297            if !path.exists() {
298                return Err(DiskAnnError::StorageError {
299                    message: "Graph file does not exist".to_string(),
300                });
301            }
302
303            let file = File::open(path).map_err(|e| DiskAnnError::IoError {
304                message: format!("Failed to open graph file: {}", e),
305            })?;
306
307            let mut reader = BufReader::new(file);
308            let (graph, _) =
309                oxicode::serde::decode_from_std_read(&mut reader, oxicode::config::standard())?;
310            Ok(graph)
311        } else {
312            Err(DiskAnnError::StorageError {
313                message: "Graph file path not set".to_string(),
314            })
315        }
316    }
317
318    fn write_metadata(&mut self, metadata: &StorageMetadata) -> DiskAnnResult<()> {
319        if let Some(path) = &self.metadata_file {
320            let mut file = File::create(path).map_err(|e| DiskAnnError::IoError {
321                message: format!("Failed to create metadata file: {}", e),
322            })?;
323
324            serde_json::to_writer_pretty(&mut file, metadata).map_err(|e| {
325                DiskAnnError::SerializationError {
326                    message: format!("Failed to serialize metadata: {}", e),
327                }
328            })?;
329
330            // Explicitly sync to disk
331            file.sync_all().map_err(|e| DiskAnnError::IoError {
332                message: format!("Failed to sync metadata file: {}", e),
333            })?;
334        }
335        Ok(())
336    }
337
338    fn read_metadata(&self) -> DiskAnnResult<StorageMetadata> {
339        if let Some(path) = &self.metadata_file {
340            if !path.exists() {
341                return Err(DiskAnnError::StorageError {
342                    message: "Metadata file does not exist".to_string(),
343                });
344            }
345
346            let file = File::open(path).map_err(|e| DiskAnnError::IoError {
347                message: format!("Failed to open metadata file: {}", e),
348            })?;
349
350            let metadata =
351                serde_json::from_reader(file).map_err(|e| DiskAnnError::SerializationError {
352                    message: format!("Failed to deserialize metadata: {}", e),
353                })?;
354
355            Ok(metadata)
356        } else {
357            Err(DiskAnnError::StorageError {
358                message: "Metadata file path not set".to_string(),
359            })
360        }
361    }
362
363    fn clear(&mut self) -> DiskAnnResult<()> {
364        self.vector_cache.clear();
365
366        if let Some(path) = &self.vector_file {
367            if path.exists() {
368                std::fs::remove_file(path).map_err(|e| DiskAnnError::IoError {
369                    message: format!("Failed to remove vector file: {}", e),
370                })?;
371            }
372        }
373
374        if let Some(path) = &self.graph_file {
375            if path.exists() {
376                std::fs::remove_file(path).map_err(|e| DiskAnnError::IoError {
377                    message: format!("Failed to remove graph file: {}", e),
378                })?;
379            }
380        }
381
382        if let Some(path) = &self.metadata_file {
383            if path.exists() {
384                std::fs::remove_file(path).map_err(|e| DiskAnnError::IoError {
385                    message: format!("Failed to remove metadata file: {}", e),
386                })?;
387            }
388        }
389
390        Ok(())
391    }
392
393    fn flush(&mut self) -> DiskAnnResult<()> {
394        // All writes are immediately flushed in this implementation
395        Ok(())
396    }
397
398    fn size(&self) -> DiskAnnResult<u64> {
399        let mut total_size = 0u64;
400
401        if let Some(path) = &self.vector_file {
402            if path.exists() {
403                total_size += std::fs::metadata(path)
404                    .map_err(|e| DiskAnnError::IoError {
405                        message: format!("Failed to get vector file size: {}", e),
406                    })?
407                    .len();
408            }
409        }
410
411        if let Some(path) = &self.graph_file {
412            if path.exists() {
413                total_size += std::fs::metadata(path)
414                    .map_err(|e| DiskAnnError::IoError {
415                        message: format!("Failed to get graph file size: {}", e),
416                    })?
417                    .len();
418            }
419        }
420
421        if let Some(path) = &self.metadata_file {
422            if path.exists() {
423                total_size += std::fs::metadata(path)
424                    .map_err(|e| DiskAnnError::IoError {
425                        message: format!("Failed to get metadata file size: {}", e),
426                    })?
427                    .len();
428            }
429        }
430
431        Ok(total_size)
432    }
433}
434
435/// Memory-mapped storage (stub for future implementation)
436#[derive(Debug, Clone, Serialize, Deserialize)]
437pub struct MemoryMappedStorage {
438    base_path: PathBuf,
439    dimension: usize,
440}
441
442impl MemoryMappedStorage {
443    pub fn new<P: AsRef<Path>>(base_path: P, dimension: usize) -> DiskAnnResult<Self> {
444        Ok(Self {
445            base_path: base_path.as_ref().to_path_buf(),
446            dimension,
447        })
448    }
449}
450
451impl StorageBackend for MemoryMappedStorage {
452    fn write_vector(&mut self, _vector_id: &VectorId, _vector: &[f32]) -> DiskAnnResult<()> {
453        Err(DiskAnnError::StorageError {
454            message: "MemoryMappedStorage not yet implemented".to_string(),
455        })
456    }
457
458    fn read_vector(&self, _vector_id: &VectorId) -> DiskAnnResult<Vec<f32>> {
459        Err(DiskAnnError::StorageError {
460            message: "MemoryMappedStorage not yet implemented".to_string(),
461        })
462    }
463
464    fn write_graph(&mut self, _graph: &VamanaGraph) -> DiskAnnResult<()> {
465        Err(DiskAnnError::StorageError {
466            message: "MemoryMappedStorage not yet implemented".to_string(),
467        })
468    }
469
470    fn read_graph(&self) -> DiskAnnResult<VamanaGraph> {
471        Err(DiskAnnError::StorageError {
472            message: "MemoryMappedStorage not yet implemented".to_string(),
473        })
474    }
475
476    fn write_metadata(&mut self, _metadata: &StorageMetadata) -> DiskAnnResult<()> {
477        Err(DiskAnnError::StorageError {
478            message: "MemoryMappedStorage not yet implemented".to_string(),
479        })
480    }
481
482    fn read_metadata(&self) -> DiskAnnResult<StorageMetadata> {
483        Err(DiskAnnError::StorageError {
484            message: "MemoryMappedStorage not yet implemented".to_string(),
485        })
486    }
487
488    fn clear(&mut self) -> DiskAnnResult<()> {
489        Ok(())
490    }
491
492    fn flush(&mut self) -> DiskAnnResult<()> {
493        Ok(())
494    }
495
496    fn size(&self) -> DiskAnnResult<u64> {
497        Ok(0)
498    }
499}
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504    use crate::diskann::config::PruningStrategy;
505    use std::env;
506
507    fn temp_dir() -> PathBuf {
508        use std::sync::atomic::{AtomicU64, Ordering};
509        static COUNTER: AtomicU64 = AtomicU64::new(0);
510        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
511        env::temp_dir().join(format!(
512            "diskann_storage_test_{}_{}",
513            chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
514            id
515        ))
516    }
517
518    #[test]
519    fn test_disk_storage_vector_write_read() {
520        let dir = temp_dir();
521        let mut storage = DiskStorage::new(&dir, 3).unwrap();
522
523        let vector = vec![1.0, 2.0, 3.0];
524        storage.write_vector(&"vec1".to_string(), &vector).unwrap();
525
526        let read_vector = storage.read_vector(&"vec1".to_string()).unwrap();
527        assert_eq!(read_vector, vector);
528
529        storage.clear().unwrap();
530    }
531
532    #[test]
533    fn test_disk_storage_dimension_mismatch() {
534        let dir = temp_dir();
535        let mut storage = DiskStorage::new(&dir, 3).unwrap();
536
537        let vector = vec![1.0, 2.0]; // Wrong dimension
538        let result = storage.write_vector(&"vec1".to_string(), &vector);
539
540        assert!(result.is_err());
541        storage.clear().unwrap();
542    }
543
544    #[test]
545    fn test_disk_storage_graph() {
546        let dir = temp_dir();
547        std::fs::remove_dir_all(&dir).ok(); // Clean up if exists
548        let mut storage = DiskStorage::new(&dir, 3).unwrap();
549
550        let mut graph = VamanaGraph::new(3, PruningStrategy::Alpha, 1.2);
551        graph.add_node("v1".to_string()).unwrap();
552        graph.add_node("v2".to_string()).unwrap();
553
554        storage.write_graph(&graph).unwrap();
555        let read_graph = storage.read_graph().unwrap();
556
557        assert_eq!(read_graph.num_nodes(), 2);
558        storage.clear().unwrap();
559        std::fs::remove_dir_all(&dir).ok();
560    }
561
562    #[test]
563    fn test_disk_storage_metadata() {
564        let dir = temp_dir();
565        std::fs::remove_dir_all(&dir).ok(); // Clean up if exists
566        let mut storage = DiskStorage::new(&dir, 128).unwrap();
567
568        let config = DiskAnnConfig::default_config(128);
569        let metadata = StorageMetadata::new(config);
570
571        storage.write_metadata(&metadata).unwrap();
572        let read_metadata = storage.read_metadata().unwrap();
573
574        assert_eq!(read_metadata.config.dimension, 128);
575        storage.clear().unwrap();
576        std::fs::remove_dir_all(&dir).ok();
577    }
578
579    #[test]
580    fn test_disk_storage_size() {
581        let dir = temp_dir();
582        let mut storage = DiskStorage::new(&dir, 3).unwrap();
583
584        let initial_size = storage.size().unwrap();
585        assert_eq!(initial_size, 0);
586
587        let vector = vec![1.0, 2.0, 3.0];
588        storage.write_vector(&"vec1".to_string(), &vector).unwrap();
589
590        let after_write = storage.size().unwrap();
591        assert!(after_write > initial_size);
592
593        storage.clear().unwrap();
594    }
595
596    #[test]
597    fn test_disk_storage_cache() {
598        let dir = temp_dir();
599        std::fs::remove_dir_all(&dir).ok(); // Clean up if exists
600        let mut storage = DiskStorage::new(&dir, 3).unwrap().with_cache_limit(2);
601
602        storage
603            .write_vector(&"v1".to_string(), &[1.0, 2.0, 3.0])
604            .unwrap();
605        storage
606            .write_vector(&"v2".to_string(), &[4.0, 5.0, 6.0])
607            .unwrap();
608        storage
609            .write_vector(&"v3".to_string(), &[7.0, 8.0, 9.0])
610            .unwrap();
611
612        // Cache should have at most 2 entries
613        assert!(storage.vector_cache.len() <= 2);
614
615        storage.clear().unwrap();
616        std::fs::remove_dir_all(&dir).ok();
617    }
618
619    #[test]
620    fn test_vector_not_found() {
621        let dir = temp_dir();
622        let storage = DiskStorage::new(&dir, 3).unwrap();
623
624        let result = storage.read_vector(&"nonexistent".to_string());
625        assert!(result.is_err());
626    }
627
628    #[test]
629    fn test_storage_clear() {
630        let dir = temp_dir();
631        std::fs::remove_dir_all(&dir).ok(); // Clean up if exists
632        let mut storage = DiskStorage::new(&dir, 3).unwrap();
633
634        storage
635            .write_vector(&"v1".to_string(), &[1.0, 2.0, 3.0])
636            .unwrap();
637
638        // Verify file was created
639        let vector_file = storage.vector_file.as_ref().unwrap().clone();
640        assert!(
641            vector_file.exists(),
642            "Vector file should exist after write: {:?}",
643            vector_file
644        );
645
646        storage.clear().unwrap();
647        assert!(
648            !vector_file.exists(),
649            "Vector file should not exist after clear: {:?}",
650            vector_file
651        );
652
653        // Cleanup
654        std::fs::remove_dir_all(&dir).ok();
655    }
656}