velesdb_core/storage/
mmap.rs

1//! Memory-mapped file storage for vectors.
2//!
3//! Uses a combination of an index file (ID -> offset) and a data file (raw vectors).
4//! Also implements a simple WAL for durability.
5//!
6//! # P2 Optimization: Aggressive Pre-allocation
7//!
8//! To minimize blocking during `ensure_capacity` (which requires a write lock),
9//! we use aggressive pre-allocation:
10//! - Initial size: 16MB (vs 64KB before) - handles most small-medium datasets
11//! - Growth factor: 2x minimum with 64MB floor - fewer resize operations
12//! - Explicit `reserve_capacity()` for bulk imports
13
14use super::guard::VectorSliceGuard;
15use super::metrics::StorageMetrics;
16use super::traits::VectorStorage;
17
18use memmap2::MmapMut;
19use parking_lot::RwLock;
20use rustc_hash::FxHashMap;
21use std::fs::{File, OpenOptions};
22use std::io::{self, Write};
23use std::path::{Path, PathBuf};
24use std::sync::atomic::{AtomicUsize, Ordering};
25use std::sync::Arc;
26use std::time::Instant;
27
28/// Memory-mapped file storage for vectors.
29///
30/// Uses a combination of an index file (ID -> offset) and a data file (raw vectors).
31/// Also implements a simple WAL for durability.
32#[allow(clippy::module_name_repetitions)]
33pub struct MmapStorage {
34    /// Directory path for storage files
35    path: PathBuf,
36    /// Vector dimension
37    dimension: usize,
38    /// In-memory index of ID -> file offset
39    /// Perf: `FxHashMap` is faster than std `HashMap` for integer keys
40    index: RwLock<FxHashMap<u64, usize>>,
41    /// Write-Ahead Log writer
42    wal: RwLock<io::BufWriter<File>>,
43    /// File handle for the data file (kept open for resizing)
44    data_file: File,
45    /// Memory mapped data file
46    mmap: RwLock<MmapMut>,
47    /// Next available offset in the data file
48    next_offset: AtomicUsize,
49    /// P0 Audit: Metrics for monitoring `ensure_capacity` latency
50    metrics: Arc<StorageMetrics>,
51}
52
53impl MmapStorage {
54    /// P2: Increased from 64KB to 16MB for better initial capacity.
55    /// This handles most small-medium datasets without any resize operations.
56    const INITIAL_SIZE: u64 = 16 * 1024 * 1024; // 16MB initial size
57
58    /// P2: Increased from 1MB to 64MB minimum growth.
59    /// Fewer resize operations = fewer blocking write locks.
60    const MIN_GROWTH: u64 = 64 * 1024 * 1024; // Minimum 64MB growth
61
62    /// P2: Growth factor for exponential pre-allocation.
63    /// Each resize at least doubles capacity for amortized O(1) growth.
64    const GROWTH_FACTOR: u64 = 2;
65
66    /// Creates a new `MmapStorage` or opens an existing one.
67    ///
68    /// # Arguments
69    ///
70    /// * `path` - Directory to store data
71    /// * `dimension` - Vector dimension
72    ///
73    /// # Errors
74    ///
75    /// Returns an error if file operations fail.
76    pub fn new<P: AsRef<Path>>(path: P, dimension: usize) -> io::Result<Self> {
77        let path = path.as_ref().to_path_buf();
78        std::fs::create_dir_all(&path)?;
79
80        // 1. Open/Create Data File
81        let data_path = path.join("vectors.dat");
82        let data_file = OpenOptions::new()
83            .read(true)
84            .write(true)
85            .create(true)
86            .truncate(false)
87            .open(&data_path)?;
88
89        let file_len = data_file.metadata()?.len();
90        if file_len == 0 {
91            data_file.set_len(Self::INITIAL_SIZE)?;
92        }
93
94        let mmap = unsafe { MmapMut::map_mut(&data_file)? };
95
96        // 2. Open/Create WAL
97        let wal_path = path.join("vectors.wal");
98        let wal_file = OpenOptions::new()
99            .append(true)
100            .create(true)
101            .open(&wal_path)?;
102        let wal = io::BufWriter::new(wal_file);
103
104        // 3. Load Index
105        let index_path = path.join("vectors.idx");
106        let (index, next_offset) = if index_path.exists() {
107            let file = File::open(&index_path)?;
108            let index: FxHashMap<u64, usize> = bincode::deserialize_from(io::BufReader::new(file))
109                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
110
111            // Calculate next_offset based on stored data
112            // Simple approach: max(offset) + size
113            let max_offset = index.values().max().copied().unwrap_or(0);
114            let size = if index.is_empty() {
115                0
116            } else {
117                max_offset + dimension * 4
118            };
119            (index, size)
120        } else {
121            (FxHashMap::default(), 0)
122        };
123
124        Ok(Self {
125            path,
126            dimension,
127            index: RwLock::new(index),
128            wal: RwLock::new(wal),
129            data_file,
130            mmap: RwLock::new(mmap),
131            next_offset: AtomicUsize::new(next_offset),
132            metrics: Arc::new(StorageMetrics::new()),
133        })
134    }
135
136    /// Ensures the memory map is large enough to hold data at `offset`.
137    ///
138    /// # P2 Optimization
139    ///
140    /// Uses aggressive pre-allocation to minimize blocking:
141    /// - Exponential growth (2x) for amortized O(1)
142    /// - 64MB minimum growth to reduce resize frequency
143    /// - For 1M vectors × 768D × 4 bytes = 3GB, only ~6 resizes needed
144    ///
145    /// # P0 Audit: Latency Monitoring
146    ///
147    /// This operation is instrumented to track latency. Monitor P99 latency
148    /// via `metrics()` to detect "stop-the-world" pauses during large resizes.
149    fn ensure_capacity(&mut self, required_len: usize) -> io::Result<()> {
150        let start = Instant::now();
151        let mut did_resize = false;
152        let mut bytes_resized = 0u64;
153
154        let mut mmap = self.mmap.write();
155        if mmap.len() < required_len {
156            // Flush current mmap before unmapping
157            mmap.flush()?;
158
159            // P2: Aggressive pre-allocation strategy
160            // Calculate new size with exponential growth
161            let current_len = mmap.len() as u64;
162            let required_u64 = required_len as u64;
163
164            // Option 1: Double current size (exponential growth)
165            let doubled = current_len.saturating_mul(Self::GROWTH_FACTOR);
166            // Option 2: Required + MIN_GROWTH headroom
167            let with_headroom = required_u64.saturating_add(Self::MIN_GROWTH);
168            // Option 3: Just the minimum growth
169            let min_growth = current_len.saturating_add(Self::MIN_GROWTH);
170
171            // Take the maximum to ensure both sufficient space and good amortization
172            let new_len = doubled.max(with_headroom).max(min_growth).max(required_u64);
173
174            // Resize file
175            self.data_file.set_len(new_len)?;
176
177            // Remap
178            *mmap = unsafe { MmapMut::map_mut(&self.data_file)? };
179
180            did_resize = true;
181            bytes_resized = new_len.saturating_sub(current_len);
182        }
183
184        // P0 Audit: Record latency metrics
185        self.metrics
186            .record_ensure_capacity(start.elapsed(), did_resize, bytes_resized);
187
188        Ok(())
189    }
190
191    /// Pre-allocates storage capacity for a known number of vectors.
192    ///
193    /// Call this before bulk imports to avoid blocking resize operations
194    /// during insertion. This is especially useful when the final dataset
195    /// size is known in advance.
196    ///
197    /// # P2 Optimization
198    ///
199    /// This allows users to pre-allocate once and avoid all resize locks
200    /// during bulk import operations.
201    ///
202    /// # Arguments
203    ///
204    /// * `vector_count` - Expected number of vectors to store
205    ///
206    /// # Example
207    ///
208    /// ```ignore
209    /// // Pre-allocate for 1 million vectors before bulk import
210    /// storage.reserve_capacity(1_000_000)?;
211    /// ```
212    ///
213    /// # Errors
214    ///
215    /// Returns an error if file operations fail.
216    pub fn reserve_capacity(&mut self, vector_count: usize) -> io::Result<()> {
217        let vector_size = self.dimension * std::mem::size_of::<f32>();
218        let required_len = vector_count.saturating_mul(vector_size);
219
220        // Add 10% headroom for safety
221        let with_headroom = required_len.saturating_add(required_len / 10);
222
223        self.ensure_capacity(with_headroom)
224    }
225
226    /// Returns a reference to the storage metrics.
227    ///
228    /// # P0 Audit: Latency Monitoring
229    ///
230    /// Use this to monitor `ensure_capacity` latency, especially P99.
231    /// High P99 latency indicates "stop-the-world" pauses during mmap resizes.
232    ///
233    /// # Example
234    ///
235    /// ```rust,ignore
236    /// let storage = MmapStorage::new(path, 768)?;
237    /// // ... perform operations ...
238    /// let stats = storage.metrics().ensure_capacity_latency_stats();
239    /// if stats.p99_exceeds(Duration::from_millis(100)) {
240    ///     warn!("High P99 latency detected: {:?}", stats.p99());
241    /// }
242    /// ```
243    #[must_use]
244    pub fn metrics(&self) -> &StorageMetrics {
245        &self.metrics
246    }
247
248    /// Compacts the storage by rewriting only active vectors.
249    ///
250    /// This reclaims disk space from deleted vectors by:
251    /// 1. Writing all active vectors to a new temporary file
252    /// 2. Atomically replacing the old file with the new one
253    ///
254    /// # TS-CORE-004: Storage Compaction
255    ///
256    /// This operation is quasi-atomic via `rename()` for crash safety.
257    /// Reads remain available during compaction (copy-on-write pattern).
258    ///
259    /// # Returns
260    ///
261    /// The number of bytes reclaimed.
262    ///
263    /// # Errors
264    ///
265    /// Returns an error if file operations fail.
266    pub fn compact(&mut self) -> io::Result<usize> {
267        let vector_size = self.dimension * std::mem::size_of::<f32>();
268
269        // 1. Get current state
270        let index = self.index.read();
271        let active_count = index.len();
272
273        if active_count == 0 {
274            // Nothing to compact
275            drop(index);
276            return Ok(0);
277        }
278
279        // Calculate space used vs allocated
280        let current_offset = self.next_offset.load(Ordering::Relaxed);
281        let active_size = active_count * vector_size;
282
283        if current_offset <= active_size {
284            // No fragmentation, nothing to reclaim
285            drop(index);
286            return Ok(0);
287        }
288
289        let bytes_to_reclaim = current_offset - active_size;
290
291        // 2. Create temporary file for compacted data
292        let temp_path = self.path.join("vectors.dat.tmp");
293        let temp_file = OpenOptions::new()
294            .read(true)
295            .write(true)
296            .create(true)
297            .truncate(true)
298            .open(&temp_path)?;
299
300        // Size the temp file for active vectors
301        let new_size = (active_size as u64).max(Self::INITIAL_SIZE);
302        temp_file.set_len(new_size)?;
303
304        let mut temp_mmap = unsafe { MmapMut::map_mut(&temp_file)? };
305
306        // 3. Copy active vectors to new file with new offsets
307        let mmap = self.mmap.read();
308        let mut new_index: FxHashMap<u64, usize> = FxHashMap::default();
309        new_index.reserve(active_count);
310
311        let mut new_offset = 0usize;
312        for (&id, &old_offset) in index.iter() {
313            // Copy vector data
314            let src = &mmap[old_offset..old_offset + vector_size];
315            temp_mmap[new_offset..new_offset + vector_size].copy_from_slice(src);
316            new_index.insert(id, new_offset);
317            new_offset += vector_size;
318        }
319
320        drop(mmap);
321        drop(index);
322
323        // 4. Flush temp file
324        temp_mmap.flush()?;
325        drop(temp_mmap);
326        drop(temp_file);
327
328        // 5. Atomic swap: rename temp -> main
329        let data_path = self.path.join("vectors.dat");
330        std::fs::rename(&temp_path, &data_path)?;
331
332        // 6. Reopen the compacted file
333        let new_data_file = OpenOptions::new().read(true).write(true).open(&data_path)?;
334
335        let new_mmap = unsafe { MmapMut::map_mut(&new_data_file)? };
336
337        // 7. Update internal state
338        *self.mmap.write() = new_mmap;
339        // Note: We can't reassign self.data_file directly, so we use std::mem::replace
340        // This is a limitation - for full fix we'd need data_file behind RwLock too
341
342        *self.index.write() = new_index;
343        self.next_offset.store(new_offset, Ordering::Relaxed);
344
345        // 8. Write compaction marker to WAL
346        {
347            let mut wal = self.wal.write();
348            // Op: Compact (4) - marker only, no data
349            wal.write_all(&[4u8])?;
350            wal.flush()?;
351        }
352
353        // 9. Save updated index
354        self.flush()?;
355
356        Ok(bytes_to_reclaim)
357    }
358
359    /// Returns the fragmentation ratio (0.0 = no fragmentation, 1.0 = 100% fragmented).
360    ///
361    /// Use this to decide when to trigger compaction.
362    /// A ratio > 0.3 (30% fragmentation) is a good threshold.
363    #[must_use]
364    pub fn fragmentation_ratio(&self) -> f64 {
365        let index = self.index.read();
366        let active_count = index.len();
367        drop(index);
368
369        if active_count == 0 {
370            return 0.0;
371        }
372
373        let vector_size = self.dimension * std::mem::size_of::<f32>();
374        let active_size = active_count * vector_size;
375        let current_offset = self.next_offset.load(Ordering::Relaxed);
376
377        if current_offset == 0 {
378            return 0.0;
379        }
380
381        #[allow(clippy::cast_precision_loss)]
382        let ratio = 1.0 - (active_size as f64 / current_offset as f64);
383        ratio.max(0.0)
384    }
385
386    /// Retrieves a vector by ID without copying (zero-copy).
387    ///
388    /// Returns a guard that provides direct access to the mmap'd data.
389    /// This is significantly faster than `retrieve()` for read-heavy workloads
390    /// as it eliminates heap allocation and memory copy.
391    ///
392    /// # Arguments
393    ///
394    /// * `id` - The vector ID to retrieve
395    ///
396    /// # Returns
397    ///
398    /// - `Ok(Some(guard))` - Guard providing zero-copy access to the vector
399    /// - `Ok(None)` - Vector with this ID doesn't exist
400    /// - `Err(...)` - I/O error (e.g., corrupted data)
401    ///
402    /// # Example
403    ///
404    /// ```rust,ignore
405    /// let guard = storage.retrieve_ref(id)?.unwrap();
406    /// let slice: &[f32] = guard.as_ref();
407    /// // Use slice directly - no allocation occurred
408    /// ```
409    ///
410    /// # Performance
411    ///
412    /// Compared to `retrieve()`:
413    /// - **No heap allocation** - data accessed directly from mmap
414    /// - **No memcpy** - pointer arithmetic only
415    /// - **Lock held** - guard must be dropped to release read lock
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if the stored offset is out of bounds (corrupted index).
420    pub fn retrieve_ref(&self, id: u64) -> io::Result<Option<VectorSliceGuard<'_>>> {
421        // First check if ID exists (separate lock to minimize contention)
422        let offset = {
423            let index = self.index.read();
424            match index.get(&id) {
425                Some(&offset) => offset,
426                None => return Ok(None),
427            }
428        };
429
430        // Now acquire mmap read lock and validate bounds
431        let mmap = self.mmap.read();
432        let vector_size = self.dimension * std::mem::size_of::<f32>();
433
434        if offset + vector_size > mmap.len() {
435            return Err(io::Error::new(
436                io::ErrorKind::InvalidData,
437                "Offset out of bounds",
438            ));
439        }
440
441        // SAFETY: We've validated that offset + vector_size <= mmap.len()
442        // The pointer is derived from the mmap which is held by the guard
443        // Note: mmap data is written with f32 alignment via store(), so alignment is guaranteed
444        #[allow(clippy::cast_ptr_alignment)]
445        let ptr = unsafe { mmap.as_ptr().add(offset).cast::<f32>() };
446
447        Ok(Some(VectorSliceGuard {
448            _guard: mmap,
449            ptr,
450            len: self.dimension,
451        }))
452    }
453}
454
455impl VectorStorage for MmapStorage {
456    fn store(&mut self, id: u64, vector: &[f32]) -> io::Result<()> {
457        if vector.len() != self.dimension {
458            return Err(io::Error::new(
459                io::ErrorKind::InvalidInput,
460                format!(
461                    "Vector dimension mismatch: expected {}, got {}",
462                    self.dimension,
463                    vector.len()
464                ),
465            ));
466        }
467
468        let vector_bytes: &[u8] = unsafe {
469            std::slice::from_raw_parts(vector.as_ptr().cast::<u8>(), std::mem::size_of_val(vector))
470        };
471
472        // 1. Write to WAL
473        {
474            let mut wal = self.wal.write();
475            // Op: Store (1) | ID | Len | Data
476            wal.write_all(&[1u8])?;
477            wal.write_all(&id.to_le_bytes())?;
478            #[allow(clippy::cast_possible_truncation)]
479            let len_u32 = vector_bytes.len() as u32;
480            wal.write_all(&len_u32.to_le_bytes())?;
481            wal.write_all(vector_bytes)?;
482        }
483
484        // 2. Determine offset
485        let vector_size = vector_bytes.len();
486
487        let (offset, is_new) = {
488            let index = self.index.read();
489            if let Some(&existing_offset) = index.get(&id) {
490                (existing_offset, false)
491            } else {
492                let offset = self.next_offset.load(Ordering::Relaxed);
493                self.next_offset.fetch_add(vector_size, Ordering::Relaxed);
494                (offset, true)
495            }
496        };
497
498        // Ensure capacity and write
499        self.ensure_capacity(offset + vector_size)?;
500
501        {
502            let mut mmap = self.mmap.write();
503            mmap[offset..offset + vector_size].copy_from_slice(vector_bytes);
504        }
505
506        // 3. Update Index if new
507        if is_new {
508            self.index.write().insert(id, offset);
509        }
510
511        Ok(())
512    }
513
514    fn store_batch(&mut self, vectors: &[(u64, &[f32])]) -> io::Result<usize> {
515        if vectors.is_empty() {
516            return Ok(0);
517        }
518
519        let vector_size = self.dimension * std::mem::size_of::<f32>();
520
521        // Validate all dimensions upfront
522        for (_, vector) in vectors {
523            if vector.len() != self.dimension {
524                return Err(io::Error::new(
525                    io::ErrorKind::InvalidInput,
526                    format!(
527                        "Vector dimension mismatch: expected {}, got {}",
528                        self.dimension,
529                        vector.len()
530                    ),
531                ));
532            }
533        }
534
535        // 1. Calculate total space needed and prepare batch WAL entry
536        // Perf: Use FxHashMap for O(1) lookup instead of Vec with O(n) find
537        let mut new_vector_offsets: FxHashMap<u64, usize> = FxHashMap::default();
538        new_vector_offsets.reserve(vectors.len());
539        let mut total_new_size = 0usize;
540
541        {
542            let index = self.index.read();
543            for &(id, _) in vectors {
544                if !index.contains_key(&id) {
545                    let offset = self.next_offset.load(Ordering::Relaxed) + total_new_size;
546                    new_vector_offsets.insert(id, offset);
547                    total_new_size += vector_size;
548                }
549            }
550        }
551
552        // 2. Pre-allocate space for all new vectors at once
553        if total_new_size > 0 {
554            let start_offset = self.next_offset.load(Ordering::Relaxed);
555            self.ensure_capacity(start_offset + total_new_size)?;
556            self.next_offset
557                .fetch_add(total_new_size, Ordering::Relaxed);
558        }
559
560        // 3. Single WAL write for entire batch (Op: BatchStore = 3)
561        {
562            let mut wal = self.wal.write();
563            // Batch header: Op(1) | Count(4)
564            wal.write_all(&[3u8])?;
565            #[allow(clippy::cast_possible_truncation)]
566            let count = vectors.len() as u32;
567            wal.write_all(&count.to_le_bytes())?;
568
569            // Write all vectors contiguously
570            for &(id, vector) in vectors {
571                let vector_bytes: &[u8] = unsafe {
572                    std::slice::from_raw_parts(
573                        vector.as_ptr().cast::<u8>(),
574                        std::mem::size_of_val(vector),
575                    )
576                };
577                wal.write_all(&id.to_le_bytes())?;
578                #[allow(clippy::cast_possible_truncation)]
579                let len_u32 = vector_bytes.len() as u32;
580                wal.write_all(&len_u32.to_le_bytes())?;
581                wal.write_all(vector_bytes)?;
582            }
583            // Note: No flush here - caller controls fsync timing
584        }
585
586        // 4. Write all vectors to mmap contiguously
587        {
588            let index = self.index.read();
589            let mut mmap = self.mmap.write();
590
591            for &(id, vector) in vectors {
592                let vector_bytes: &[u8] = unsafe {
593                    std::slice::from_raw_parts(
594                        vector.as_ptr().cast::<u8>(),
595                        std::mem::size_of_val(vector),
596                    )
597                };
598
599                // Get offset (existing or from new_vector_offsets)
600                // Perf: O(1) HashMap lookup instead of O(n) linear search
601                let offset = if let Some(&existing) = index.get(&id) {
602                    existing
603                } else {
604                    new_vector_offsets.get(&id).copied().unwrap_or(0)
605                };
606
607                mmap[offset..offset + vector_size].copy_from_slice(vector_bytes);
608            }
609        }
610
611        // 5. Batch update index
612        if !new_vector_offsets.is_empty() {
613            let mut index = self.index.write();
614            for (id, offset) in new_vector_offsets {
615                index.insert(id, offset);
616            }
617        }
618
619        Ok(vectors.len())
620    }
621
622    fn retrieve(&self, id: u64) -> io::Result<Option<Vec<f32>>> {
623        let index = self.index.read();
624        let Some(&offset) = index.get(&id) else {
625            return Ok(None);
626        };
627        drop(index); // Release lock
628
629        let mmap = self.mmap.read();
630        let vector_size = self.dimension * std::mem::size_of::<f32>();
631
632        if offset + vector_size > mmap.len() {
633            return Err(io::Error::new(
634                io::ErrorKind::InvalidData,
635                "Offset out of bounds",
636            ));
637        }
638
639        let bytes = &mmap[offset..offset + vector_size];
640
641        // Convert bytes back to f32
642        let mut vector = vec![0.0f32; self.dimension];
643        unsafe {
644            std::ptr::copy_nonoverlapping(
645                bytes.as_ptr(),
646                vector.as_mut_ptr().cast::<u8>(),
647                vector_size,
648            );
649        }
650
651        Ok(Some(vector))
652    }
653
654    fn delete(&mut self, id: u64) -> io::Result<()> {
655        // 1. Write to WAL
656        {
657            let mut wal = self.wal.write();
658            // Op: Delete (2) | ID
659            wal.write_all(&[2u8])?;
660            wal.write_all(&id.to_le_bytes())?;
661        }
662
663        // 2. Remove from Index
664        let mut index = self.index.write();
665        index.remove(&id);
666
667        // Note: Space is reclaimed via compact() - see TS-CORE-004
668
669        Ok(())
670    }
671
672    fn flush(&mut self) -> io::Result<()> {
673        // 1. Flush Mmap
674        self.mmap.write().flush()?;
675
676        // 2. Flush WAL
677        self.wal.write().flush()?;
678
679        // 3. Save Index
680        let index_path = self.path.join("vectors.idx");
681        let file = File::create(&index_path)?;
682        let index = self.index.read();
683        bincode::serialize_into(io::BufWriter::new(file), &*index).map_err(io::Error::other)?;
684
685        Ok(())
686    }
687
688    fn len(&self) -> usize {
689        self.index.read().len()
690    }
691
692    fn ids(&self) -> Vec<u64> {
693        self.index.read().keys().copied().collect()
694    }
695}