velesdb_core/storage/mmap.rs
1//! Memory-mapped file storage for vectors.
2//!
3//! Uses a combination of an index file (ID -> offset) and a data file (raw vectors).
4//! Also implements a simple WAL for durability.
5//!
6//! # P2 Optimization: Aggressive Pre-allocation
7//!
8//! To minimize blocking during `ensure_capacity` (which requires a write lock),
9//! we use aggressive pre-allocation:
10//! - Initial size: 16MB (vs 64KB before) - handles most small-medium datasets
11//! - Growth factor: 2x minimum with 64MB floor - fewer resize operations
12//! - Explicit `reserve_capacity()` for bulk imports
13
14use super::guard::VectorSliceGuard;
15use super::metrics::StorageMetrics;
16use super::traits::VectorStorage;
17
18use memmap2::MmapMut;
19use parking_lot::RwLock;
20use rustc_hash::FxHashMap;
21use std::fs::{File, OpenOptions};
22use std::io::{self, Write};
23use std::path::{Path, PathBuf};
24use std::sync::atomic::{AtomicUsize, Ordering};
25use std::sync::Arc;
26use std::time::Instant;
27
28/// Memory-mapped file storage for vectors.
29///
30/// Uses a combination of an index file (ID -> offset) and a data file (raw vectors).
31/// Also implements a simple WAL for durability.
32#[allow(clippy::module_name_repetitions)]
33pub struct MmapStorage {
34 /// Directory path for storage files
35 path: PathBuf,
36 /// Vector dimension
37 dimension: usize,
38 /// In-memory index of ID -> file offset
39 /// Perf: `FxHashMap` is faster than std `HashMap` for integer keys
40 index: RwLock<FxHashMap<u64, usize>>,
41 /// Write-Ahead Log writer
42 wal: RwLock<io::BufWriter<File>>,
43 /// File handle for the data file (kept open for resizing)
44 data_file: File,
45 /// Memory mapped data file
46 mmap: RwLock<MmapMut>,
47 /// Next available offset in the data file
48 next_offset: AtomicUsize,
49 /// P0 Audit: Metrics for monitoring `ensure_capacity` latency
50 metrics: Arc<StorageMetrics>,
51}
52
53impl MmapStorage {
54 /// P2: Increased from 64KB to 16MB for better initial capacity.
55 /// This handles most small-medium datasets without any resize operations.
56 const INITIAL_SIZE: u64 = 16 * 1024 * 1024; // 16MB initial size
57
58 /// P2: Increased from 1MB to 64MB minimum growth.
59 /// Fewer resize operations = fewer blocking write locks.
60 const MIN_GROWTH: u64 = 64 * 1024 * 1024; // Minimum 64MB growth
61
62 /// P2: Growth factor for exponential pre-allocation.
63 /// Each resize at least doubles capacity for amortized O(1) growth.
64 const GROWTH_FACTOR: u64 = 2;
65
66 /// Creates a new `MmapStorage` or opens an existing one.
67 ///
68 /// # Arguments
69 ///
70 /// * `path` - Directory to store data
71 /// * `dimension` - Vector dimension
72 ///
73 /// # Errors
74 ///
75 /// Returns an error if file operations fail.
76 pub fn new<P: AsRef<Path>>(path: P, dimension: usize) -> io::Result<Self> {
77 let path = path.as_ref().to_path_buf();
78 std::fs::create_dir_all(&path)?;
79
80 // 1. Open/Create Data File
81 let data_path = path.join("vectors.dat");
82 let data_file = OpenOptions::new()
83 .read(true)
84 .write(true)
85 .create(true)
86 .truncate(false)
87 .open(&data_path)?;
88
89 let file_len = data_file.metadata()?.len();
90 if file_len == 0 {
91 data_file.set_len(Self::INITIAL_SIZE)?;
92 }
93
94 let mmap = unsafe { MmapMut::map_mut(&data_file)? };
95
96 // 2. Open/Create WAL
97 let wal_path = path.join("vectors.wal");
98 let wal_file = OpenOptions::new()
99 .append(true)
100 .create(true)
101 .open(&wal_path)?;
102 let wal = io::BufWriter::new(wal_file);
103
104 // 3. Load Index
105 let index_path = path.join("vectors.idx");
106 let (index, next_offset) = if index_path.exists() {
107 let file = File::open(&index_path)?;
108 let index: FxHashMap<u64, usize> = bincode::deserialize_from(io::BufReader::new(file))
109 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
110
111 // Calculate next_offset based on stored data
112 // Simple approach: max(offset) + size
113 let max_offset = index.values().max().copied().unwrap_or(0);
114 let size = if index.is_empty() {
115 0
116 } else {
117 max_offset + dimension * 4
118 };
119 (index, size)
120 } else {
121 (FxHashMap::default(), 0)
122 };
123
124 Ok(Self {
125 path,
126 dimension,
127 index: RwLock::new(index),
128 wal: RwLock::new(wal),
129 data_file,
130 mmap: RwLock::new(mmap),
131 next_offset: AtomicUsize::new(next_offset),
132 metrics: Arc::new(StorageMetrics::new()),
133 })
134 }
135
136 /// Ensures the memory map is large enough to hold data at `offset`.
137 ///
138 /// # P2 Optimization
139 ///
140 /// Uses aggressive pre-allocation to minimize blocking:
141 /// - Exponential growth (2x) for amortized O(1)
142 /// - 64MB minimum growth to reduce resize frequency
143 /// - For 1M vectors × 768D × 4 bytes = 3GB, only ~6 resizes needed
144 ///
145 /// # P0 Audit: Latency Monitoring
146 ///
147 /// This operation is instrumented to track latency. Monitor P99 latency
148 /// via `metrics()` to detect "stop-the-world" pauses during large resizes.
149 fn ensure_capacity(&mut self, required_len: usize) -> io::Result<()> {
150 let start = Instant::now();
151 let mut did_resize = false;
152 let mut bytes_resized = 0u64;
153
154 let mut mmap = self.mmap.write();
155 if mmap.len() < required_len {
156 // Flush current mmap before unmapping
157 mmap.flush()?;
158
159 // P2: Aggressive pre-allocation strategy
160 // Calculate new size with exponential growth
161 let current_len = mmap.len() as u64;
162 let required_u64 = required_len as u64;
163
164 // Option 1: Double current size (exponential growth)
165 let doubled = current_len.saturating_mul(Self::GROWTH_FACTOR);
166 // Option 2: Required + MIN_GROWTH headroom
167 let with_headroom = required_u64.saturating_add(Self::MIN_GROWTH);
168 // Option 3: Just the minimum growth
169 let min_growth = current_len.saturating_add(Self::MIN_GROWTH);
170
171 // Take the maximum to ensure both sufficient space and good amortization
172 let new_len = doubled.max(with_headroom).max(min_growth).max(required_u64);
173
174 // Resize file
175 self.data_file.set_len(new_len)?;
176
177 // Remap
178 *mmap = unsafe { MmapMut::map_mut(&self.data_file)? };
179
180 did_resize = true;
181 bytes_resized = new_len.saturating_sub(current_len);
182 }
183
184 // P0 Audit: Record latency metrics
185 self.metrics
186 .record_ensure_capacity(start.elapsed(), did_resize, bytes_resized);
187
188 Ok(())
189 }
190
191 /// Pre-allocates storage capacity for a known number of vectors.
192 ///
193 /// Call this before bulk imports to avoid blocking resize operations
194 /// during insertion. This is especially useful when the final dataset
195 /// size is known in advance.
196 ///
197 /// # P2 Optimization
198 ///
199 /// This allows users to pre-allocate once and avoid all resize locks
200 /// during bulk import operations.
201 ///
202 /// # Arguments
203 ///
204 /// * `vector_count` - Expected number of vectors to store
205 ///
206 /// # Example
207 ///
208 /// ```ignore
209 /// // Pre-allocate for 1 million vectors before bulk import
210 /// storage.reserve_capacity(1_000_000)?;
211 /// ```
212 ///
213 /// # Errors
214 ///
215 /// Returns an error if file operations fail.
216 pub fn reserve_capacity(&mut self, vector_count: usize) -> io::Result<()> {
217 let vector_size = self.dimension * std::mem::size_of::<f32>();
218 let required_len = vector_count.saturating_mul(vector_size);
219
220 // Add 10% headroom for safety
221 let with_headroom = required_len.saturating_add(required_len / 10);
222
223 self.ensure_capacity(with_headroom)
224 }
225
226 /// Returns a reference to the storage metrics.
227 ///
228 /// # P0 Audit: Latency Monitoring
229 ///
230 /// Use this to monitor `ensure_capacity` latency, especially P99.
231 /// High P99 latency indicates "stop-the-world" pauses during mmap resizes.
232 ///
233 /// # Example
234 ///
235 /// ```rust,ignore
236 /// let storage = MmapStorage::new(path, 768)?;
237 /// // ... perform operations ...
238 /// let stats = storage.metrics().ensure_capacity_latency_stats();
239 /// if stats.p99_exceeds(Duration::from_millis(100)) {
240 /// warn!("High P99 latency detected: {:?}", stats.p99());
241 /// }
242 /// ```
243 #[must_use]
244 pub fn metrics(&self) -> &StorageMetrics {
245 &self.metrics
246 }
247
248 /// Compacts the storage by rewriting only active vectors.
249 ///
250 /// This reclaims disk space from deleted vectors by:
251 /// 1. Writing all active vectors to a new temporary file
252 /// 2. Atomically replacing the old file with the new one
253 ///
254 /// # TS-CORE-004: Storage Compaction
255 ///
256 /// This operation is quasi-atomic via `rename()` for crash safety.
257 /// Reads remain available during compaction (copy-on-write pattern).
258 ///
259 /// # Returns
260 ///
261 /// The number of bytes reclaimed.
262 ///
263 /// # Errors
264 ///
265 /// Returns an error if file operations fail.
266 pub fn compact(&mut self) -> io::Result<usize> {
267 let vector_size = self.dimension * std::mem::size_of::<f32>();
268
269 // 1. Get current state
270 let index = self.index.read();
271 let active_count = index.len();
272
273 if active_count == 0 {
274 // Nothing to compact
275 drop(index);
276 return Ok(0);
277 }
278
279 // Calculate space used vs allocated
280 let current_offset = self.next_offset.load(Ordering::Relaxed);
281 let active_size = active_count * vector_size;
282
283 if current_offset <= active_size {
284 // No fragmentation, nothing to reclaim
285 drop(index);
286 return Ok(0);
287 }
288
289 let bytes_to_reclaim = current_offset - active_size;
290
291 // 2. Create temporary file for compacted data
292 let temp_path = self.path.join("vectors.dat.tmp");
293 let temp_file = OpenOptions::new()
294 .read(true)
295 .write(true)
296 .create(true)
297 .truncate(true)
298 .open(&temp_path)?;
299
300 // Size the temp file for active vectors
301 let new_size = (active_size as u64).max(Self::INITIAL_SIZE);
302 temp_file.set_len(new_size)?;
303
304 let mut temp_mmap = unsafe { MmapMut::map_mut(&temp_file)? };
305
306 // 3. Copy active vectors to new file with new offsets
307 let mmap = self.mmap.read();
308 let mut new_index: FxHashMap<u64, usize> = FxHashMap::default();
309 new_index.reserve(active_count);
310
311 let mut new_offset = 0usize;
312 for (&id, &old_offset) in index.iter() {
313 // Copy vector data
314 let src = &mmap[old_offset..old_offset + vector_size];
315 temp_mmap[new_offset..new_offset + vector_size].copy_from_slice(src);
316 new_index.insert(id, new_offset);
317 new_offset += vector_size;
318 }
319
320 drop(mmap);
321 drop(index);
322
323 // 4. Flush temp file
324 temp_mmap.flush()?;
325 drop(temp_mmap);
326 drop(temp_file);
327
328 // 5. Atomic swap: rename temp -> main
329 let data_path = self.path.join("vectors.dat");
330 std::fs::rename(&temp_path, &data_path)?;
331
332 // 6. Reopen the compacted file
333 let new_data_file = OpenOptions::new().read(true).write(true).open(&data_path)?;
334
335 let new_mmap = unsafe { MmapMut::map_mut(&new_data_file)? };
336
337 // 7. Update internal state
338 *self.mmap.write() = new_mmap;
339 // Note: We can't reassign self.data_file directly, so we use std::mem::replace
340 // This is a limitation - for full fix we'd need data_file behind RwLock too
341
342 *self.index.write() = new_index;
343 self.next_offset.store(new_offset, Ordering::Relaxed);
344
345 // 8. Write compaction marker to WAL
346 {
347 let mut wal = self.wal.write();
348 // Op: Compact (4) - marker only, no data
349 wal.write_all(&[4u8])?;
350 wal.flush()?;
351 }
352
353 // 9. Save updated index
354 self.flush()?;
355
356 Ok(bytes_to_reclaim)
357 }
358
359 /// Returns the fragmentation ratio (0.0 = no fragmentation, 1.0 = 100% fragmented).
360 ///
361 /// Use this to decide when to trigger compaction.
362 /// A ratio > 0.3 (30% fragmentation) is a good threshold.
363 #[must_use]
364 pub fn fragmentation_ratio(&self) -> f64 {
365 let index = self.index.read();
366 let active_count = index.len();
367 drop(index);
368
369 if active_count == 0 {
370 return 0.0;
371 }
372
373 let vector_size = self.dimension * std::mem::size_of::<f32>();
374 let active_size = active_count * vector_size;
375 let current_offset = self.next_offset.load(Ordering::Relaxed);
376
377 if current_offset == 0 {
378 return 0.0;
379 }
380
381 #[allow(clippy::cast_precision_loss)]
382 let ratio = 1.0 - (active_size as f64 / current_offset as f64);
383 ratio.max(0.0)
384 }
385
386 /// Retrieves a vector by ID without copying (zero-copy).
387 ///
388 /// Returns a guard that provides direct access to the mmap'd data.
389 /// This is significantly faster than `retrieve()` for read-heavy workloads
390 /// as it eliminates heap allocation and memory copy.
391 ///
392 /// # Arguments
393 ///
394 /// * `id` - The vector ID to retrieve
395 ///
396 /// # Returns
397 ///
398 /// - `Ok(Some(guard))` - Guard providing zero-copy access to the vector
399 /// - `Ok(None)` - Vector with this ID doesn't exist
400 /// - `Err(...)` - I/O error (e.g., corrupted data)
401 ///
402 /// # Example
403 ///
404 /// ```rust,ignore
405 /// let guard = storage.retrieve_ref(id)?.unwrap();
406 /// let slice: &[f32] = guard.as_ref();
407 /// // Use slice directly - no allocation occurred
408 /// ```
409 ///
410 /// # Performance
411 ///
412 /// Compared to `retrieve()`:
413 /// - **No heap allocation** - data accessed directly from mmap
414 /// - **No memcpy** - pointer arithmetic only
415 /// - **Lock held** - guard must be dropped to release read lock
416 ///
417 /// # Errors
418 ///
419 /// Returns an error if the stored offset is out of bounds (corrupted index).
420 pub fn retrieve_ref(&self, id: u64) -> io::Result<Option<VectorSliceGuard<'_>>> {
421 // First check if ID exists (separate lock to minimize contention)
422 let offset = {
423 let index = self.index.read();
424 match index.get(&id) {
425 Some(&offset) => offset,
426 None => return Ok(None),
427 }
428 };
429
430 // Now acquire mmap read lock and validate bounds
431 let mmap = self.mmap.read();
432 let vector_size = self.dimension * std::mem::size_of::<f32>();
433
434 if offset + vector_size > mmap.len() {
435 return Err(io::Error::new(
436 io::ErrorKind::InvalidData,
437 "Offset out of bounds",
438 ));
439 }
440
441 // SAFETY: We've validated that offset + vector_size <= mmap.len()
442 // The pointer is derived from the mmap which is held by the guard
443 // Note: mmap data is written with f32 alignment via store(), so alignment is guaranteed
444 #[allow(clippy::cast_ptr_alignment)]
445 let ptr = unsafe { mmap.as_ptr().add(offset).cast::<f32>() };
446
447 Ok(Some(VectorSliceGuard {
448 _guard: mmap,
449 ptr,
450 len: self.dimension,
451 }))
452 }
453}
454
455impl VectorStorage for MmapStorage {
456 fn store(&mut self, id: u64, vector: &[f32]) -> io::Result<()> {
457 if vector.len() != self.dimension {
458 return Err(io::Error::new(
459 io::ErrorKind::InvalidInput,
460 format!(
461 "Vector dimension mismatch: expected {}, got {}",
462 self.dimension,
463 vector.len()
464 ),
465 ));
466 }
467
468 let vector_bytes: &[u8] = unsafe {
469 std::slice::from_raw_parts(vector.as_ptr().cast::<u8>(), std::mem::size_of_val(vector))
470 };
471
472 // 1. Write to WAL
473 {
474 let mut wal = self.wal.write();
475 // Op: Store (1) | ID | Len | Data
476 wal.write_all(&[1u8])?;
477 wal.write_all(&id.to_le_bytes())?;
478 #[allow(clippy::cast_possible_truncation)]
479 let len_u32 = vector_bytes.len() as u32;
480 wal.write_all(&len_u32.to_le_bytes())?;
481 wal.write_all(vector_bytes)?;
482 }
483
484 // 2. Determine offset
485 let vector_size = vector_bytes.len();
486
487 let (offset, is_new) = {
488 let index = self.index.read();
489 if let Some(&existing_offset) = index.get(&id) {
490 (existing_offset, false)
491 } else {
492 let offset = self.next_offset.load(Ordering::Relaxed);
493 self.next_offset.fetch_add(vector_size, Ordering::Relaxed);
494 (offset, true)
495 }
496 };
497
498 // Ensure capacity and write
499 self.ensure_capacity(offset + vector_size)?;
500
501 {
502 let mut mmap = self.mmap.write();
503 mmap[offset..offset + vector_size].copy_from_slice(vector_bytes);
504 }
505
506 // 3. Update Index if new
507 if is_new {
508 self.index.write().insert(id, offset);
509 }
510
511 Ok(())
512 }
513
514 fn store_batch(&mut self, vectors: &[(u64, &[f32])]) -> io::Result<usize> {
515 if vectors.is_empty() {
516 return Ok(0);
517 }
518
519 let vector_size = self.dimension * std::mem::size_of::<f32>();
520
521 // Validate all dimensions upfront
522 for (_, vector) in vectors {
523 if vector.len() != self.dimension {
524 return Err(io::Error::new(
525 io::ErrorKind::InvalidInput,
526 format!(
527 "Vector dimension mismatch: expected {}, got {}",
528 self.dimension,
529 vector.len()
530 ),
531 ));
532 }
533 }
534
535 // 1. Calculate total space needed and prepare batch WAL entry
536 // Perf: Use FxHashMap for O(1) lookup instead of Vec with O(n) find
537 let mut new_vector_offsets: FxHashMap<u64, usize> = FxHashMap::default();
538 new_vector_offsets.reserve(vectors.len());
539 let mut total_new_size = 0usize;
540
541 {
542 let index = self.index.read();
543 for &(id, _) in vectors {
544 if !index.contains_key(&id) {
545 let offset = self.next_offset.load(Ordering::Relaxed) + total_new_size;
546 new_vector_offsets.insert(id, offset);
547 total_new_size += vector_size;
548 }
549 }
550 }
551
552 // 2. Pre-allocate space for all new vectors at once
553 if total_new_size > 0 {
554 let start_offset = self.next_offset.load(Ordering::Relaxed);
555 self.ensure_capacity(start_offset + total_new_size)?;
556 self.next_offset
557 .fetch_add(total_new_size, Ordering::Relaxed);
558 }
559
560 // 3. Single WAL write for entire batch (Op: BatchStore = 3)
561 {
562 let mut wal = self.wal.write();
563 // Batch header: Op(1) | Count(4)
564 wal.write_all(&[3u8])?;
565 #[allow(clippy::cast_possible_truncation)]
566 let count = vectors.len() as u32;
567 wal.write_all(&count.to_le_bytes())?;
568
569 // Write all vectors contiguously
570 for &(id, vector) in vectors {
571 let vector_bytes: &[u8] = unsafe {
572 std::slice::from_raw_parts(
573 vector.as_ptr().cast::<u8>(),
574 std::mem::size_of_val(vector),
575 )
576 };
577 wal.write_all(&id.to_le_bytes())?;
578 #[allow(clippy::cast_possible_truncation)]
579 let len_u32 = vector_bytes.len() as u32;
580 wal.write_all(&len_u32.to_le_bytes())?;
581 wal.write_all(vector_bytes)?;
582 }
583 // Note: No flush here - caller controls fsync timing
584 }
585
586 // 4. Write all vectors to mmap contiguously
587 {
588 let index = self.index.read();
589 let mut mmap = self.mmap.write();
590
591 for &(id, vector) in vectors {
592 let vector_bytes: &[u8] = unsafe {
593 std::slice::from_raw_parts(
594 vector.as_ptr().cast::<u8>(),
595 std::mem::size_of_val(vector),
596 )
597 };
598
599 // Get offset (existing or from new_vector_offsets)
600 // Perf: O(1) HashMap lookup instead of O(n) linear search
601 let offset = if let Some(&existing) = index.get(&id) {
602 existing
603 } else {
604 new_vector_offsets.get(&id).copied().unwrap_or(0)
605 };
606
607 mmap[offset..offset + vector_size].copy_from_slice(vector_bytes);
608 }
609 }
610
611 // 5. Batch update index
612 if !new_vector_offsets.is_empty() {
613 let mut index = self.index.write();
614 for (id, offset) in new_vector_offsets {
615 index.insert(id, offset);
616 }
617 }
618
619 Ok(vectors.len())
620 }
621
622 fn retrieve(&self, id: u64) -> io::Result<Option<Vec<f32>>> {
623 let index = self.index.read();
624 let Some(&offset) = index.get(&id) else {
625 return Ok(None);
626 };
627 drop(index); // Release lock
628
629 let mmap = self.mmap.read();
630 let vector_size = self.dimension * std::mem::size_of::<f32>();
631
632 if offset + vector_size > mmap.len() {
633 return Err(io::Error::new(
634 io::ErrorKind::InvalidData,
635 "Offset out of bounds",
636 ));
637 }
638
639 let bytes = &mmap[offset..offset + vector_size];
640
641 // Convert bytes back to f32
642 let mut vector = vec![0.0f32; self.dimension];
643 unsafe {
644 std::ptr::copy_nonoverlapping(
645 bytes.as_ptr(),
646 vector.as_mut_ptr().cast::<u8>(),
647 vector_size,
648 );
649 }
650
651 Ok(Some(vector))
652 }
653
654 fn delete(&mut self, id: u64) -> io::Result<()> {
655 // 1. Write to WAL
656 {
657 let mut wal = self.wal.write();
658 // Op: Delete (2) | ID
659 wal.write_all(&[2u8])?;
660 wal.write_all(&id.to_le_bytes())?;
661 }
662
663 // 2. Remove from Index
664 let mut index = self.index.write();
665 index.remove(&id);
666
667 // Note: Space is reclaimed via compact() - see TS-CORE-004
668
669 Ok(())
670 }
671
672 fn flush(&mut self) -> io::Result<()> {
673 // 1. Flush Mmap
674 self.mmap.write().flush()?;
675
676 // 2. Flush WAL
677 self.wal.write().flush()?;
678
679 // 3. Save Index
680 let index_path = self.path.join("vectors.idx");
681 let file = File::create(&index_path)?;
682 let index = self.index.read();
683 bincode::serialize_into(io::BufWriter::new(file), &*index).map_err(io::Error::other)?;
684
685 Ok(())
686 }
687
688 fn len(&self) -> usize {
689 self.index.read().len()
690 }
691
692 fn ids(&self) -> Vec<u64> {
693 self.index.read().keys().copied().collect()
694 }
695}