mod vector_io;
mod wal_replay;
use super::compaction;
use super::guard::VectorSliceGuard;
use super::log_payload::DurabilityMode;
use super::metrics::StorageMetrics;
use super::sharded_index::ShardedIndex;
use super::traits::VectorStorage;
use crate::metrics::global_guardrails_metrics;
use memmap2::MmapMut;
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use std::fs::{File, OpenOptions};
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use tracing::error;
#[allow(clippy::module_name_repetitions)]
pub struct MmapStorage {
pub(super) path: PathBuf,
pub(super) dimension: usize,
pub(super) index: ShardedIndex,
pub(super) wal: RwLock<io::BufWriter<File>>,
pub(super) data_file: File,
pub(super) mmap: RwLock<MmapMut>,
pub(super) next_offset: AtomicUsize,
pub(super) metrics: Arc<StorageMetrics>,
pub(super) remap_epoch: AtomicU64,
pub(super) durability: DurabilityMode,
}
impl MmapStorage {
pub(super) const INITIAL_SIZE: u64 = 16 * 1024 * 1024;
pub(super) const MIN_GROWTH: u64 = 64 * 1024 * 1024;
pub(super) const GROWTH_FACTOR: u64 = 2;
pub fn new<P: AsRef<Path>>(path: P, dimension: usize) -> io::Result<Self> {
Self::new_with_durability(path, dimension, DurabilityMode::default())
}
pub fn new_with_durability<P: AsRef<Path>>(
path: P,
dimension: usize,
durability: DurabilityMode,
) -> io::Result<Self> {
let path = path.as_ref().to_path_buf();
std::fs::create_dir_all(&path)?;
let data_path = path.join("vectors.dat");
compaction::recover_compaction_artifacts(&data_path)?;
let data_file = Self::open_data_file(&data_path)?;
let mmap = Self::create_initial_mmap(&data_file)?;
let wal_path = path.join("vectors.wal");
let wal = Self::open_wal(&wal_path)?;
let index_path = path.join("vectors.idx");
let (index, next_offset) = Self::load_index(&index_path, dimension)?;
let (mmap, next_offset) =
Self::replay_wal(mmap, next_offset, &wal_path, &index, dimension)?;
Ok(Self {
path,
dimension,
index,
wal: RwLock::new(wal),
data_file,
mmap: RwLock::new(mmap),
next_offset: AtomicUsize::new(next_offset),
metrics: Arc::new(StorageMetrics::new()),
remap_epoch: AtomicU64::new(0),
durability,
})
}
#[must_use]
pub fn durability(&self) -> DurabilityMode {
self.durability
}
pub fn set_durability_mode(&mut self, mode: DurabilityMode) {
self.durability = mode;
}
#[must_use]
pub fn metrics(&self) -> &StorageMetrics {
&self.metrics
}
fn open_data_file(data_path: &Path) -> io::Result<File> {
let data_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(data_path)?;
let file_len = data_file.metadata()?.len();
if file_len == 0 {
data_file.set_len(Self::INITIAL_SIZE)?;
}
Ok(data_file)
}
fn create_initial_mmap(data_file: &File) -> io::Result<MmapMut> {
unsafe { MmapMut::map_mut(data_file) }
}
fn open_wal(wal_path: &Path) -> io::Result<io::BufWriter<File>> {
let wal_file = OpenOptions::new()
.append(true)
.create(true)
.open(wal_path)?;
Ok(io::BufWriter::new(wal_file))
}
fn load_index(index_path: &Path, dimension: usize) -> io::Result<(ShardedIndex, usize)> {
if !index_path.exists() {
return Ok((ShardedIndex::new(), 0));
}
let bytes = std::fs::read(index_path)?;
let flat_index: FxHashMap<u64, usize> = postcard::from_bytes(&bytes)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let max_offset = flat_index.values().max().copied().unwrap_or(0);
let size = if flat_index.is_empty() {
0
} else {
max_offset + dimension * 4
};
Ok((ShardedIndex::from_hashmap(flat_index), size))
}
fn replay_wal(
mut mmap: MmapMut,
mut next_offset: usize,
wal_path: &Path,
index: &ShardedIndex,
dimension: usize,
) -> io::Result<(MmapMut, usize)> {
let replayed = wal_replay::replay_wal_to_index(
wal_path,
index,
dimension,
&mut mmap,
&mut next_offset,
)?;
if replayed > 0 {
mmap.flush()?;
}
Ok((mmap, next_offset))
}
pub fn retrieve_ref(&self, id: u64) -> io::Result<Option<VectorSliceGuard<'_>>> {
let Some(offset) = self.index.get(id) else {
return Ok(None);
};
let mmap = self.mmap.read();
let vector_size = self.dimension * std::mem::size_of::<f32>();
Self::validate_offset(offset, vector_size, mmap.len())?;
#[allow(clippy::cast_ptr_alignment)]
let ptr = unsafe { mmap.as_ptr().add(offset).cast::<f32>() };
let epoch_at_creation = self.remap_epoch.load(Ordering::Acquire);
Ok(Some(VectorSliceGuard {
_guard: mmap,
ptr,
len: self.dimension,
epoch_ptr: &self.remap_epoch,
epoch_at_creation,
}))
}
fn validate_offset(offset: usize, vector_size: usize, mmap_len: usize) -> io::Result<()> {
let end = offset.checked_add(vector_size).ok_or_else(|| {
global_guardrails_metrics().record_invalid_offset_read_error();
io::Error::new(
io::ErrorKind::InvalidData,
"Offset arithmetic overflow while reading vector",
)
})?;
if end > mmap_len {
global_guardrails_metrics().record_invalid_offset_read_error();
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Offset out of bounds",
));
}
if offset % std::mem::align_of::<f32>() != 0 {
global_guardrails_metrics().record_invalid_offset_read_error();
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"EPIC-032/US-001: offset {offset} is not f32-aligned (must be multiple of {})",
std::mem::align_of::<f32>()
),
));
}
Ok(())
}
pub fn flush_index(&self) -> io::Result<()> {
let index_path = self.path.join("vectors.idx");
let file = File::create(&index_path)?;
let mut writer = io::BufWriter::new(file);
let flat_index = self.index.to_hashmap();
let bytes = postcard::to_allocvec(&flat_index).map_err(io::Error::other)?;
writer.write_all(&bytes)?;
writer.flush()?;
writer
.into_inner()
.map_err(std::io::IntoInnerError::into_error)?
.sync_all()?;
Ok(())
}
pub fn flush_full(&mut self) -> io::Result<()> {
self.flush()?;
self.flush_index()
}
pub(crate) fn flush_on_shutdown_best_effort(&self) {
self.try_flush_wal();
self.try_flush_mmap();
}
fn try_flush_wal(&self) {
if let Some(mut wal) = self.wal.try_write() {
if let Err(e) = wal.flush() {
error!(?e, "Failed to flush WAL in MmapStorage shutdown path");
}
if let Err(e) = wal.get_ref().sync_all() {
error!(?e, "Failed to fsync WAL in MmapStorage shutdown path");
}
}
}
fn try_flush_mmap(&self) {
if let Some(mmap) = self.mmap.try_write() {
if let Err(e) = mmap.flush() {
error!(?e, "Failed to flush mmap in MmapStorage shutdown path");
}
}
}
}
impl Drop for MmapStorage {
fn drop(&mut self) {
self.flush_on_shutdown_best_effort();
}
}