use crate::storage::log_payload::crc32_hash;
use crate::storage::sharded_index::ShardedIndex;
use memmap2::MmapMut;
use std::fs::{File, OpenOptions};
use std::io::{self, BufReader, Read, Seek};
use std::path::Path;
const MIN_STORE_ENTRY: u64 = 17;
const DELETE_ENTRY_SIZE: u64 = 13;
enum EntryOutcome {
Applied,
Stop,
}
impl EntryOutcome {
const fn should_continue(&self) -> bool {
matches!(self, Self::Applied)
}
}
struct ReplayTarget<'a> {
mmap: &'a mut MmapMut,
data_file: &'a File,
}
impl ReplayTarget<'_> {
fn ensure_capacity(&mut self, required_len: usize) -> io::Result<()> {
if self.mmap.len() >= required_len {
return Ok(());
}
self.mmap.flush()?;
let required_u64 = u64::try_from(required_len)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "replay length overflow"))?;
let new_len = required_u64.saturating_add(super::MmapStorage::MIN_GROWTH);
self.data_file.set_len(new_len)?;
*self.mmap = unsafe { MmapMut::map_mut(self.data_file)? };
Ok(())
}
}
#[allow(clippy::module_name_repetitions)]
pub(crate) fn replay_wal_to_index(
wal_path: &Path,
index: &ShardedIndex,
dimension: usize,
mmap: &mut MmapMut,
data_file: &File,
next_offset: &mut usize,
touched_ids: &mut Vec<u64>,
) -> io::Result<usize> {
let Some((mut reader, file_len)) = open_crc_wal(wal_path)? else {
return Ok(0);
};
let vector_size = dimension * std::mem::size_of::<f32>();
let mut target = ReplayTarget { mmap, data_file };
drain_wal_entries(
&mut reader,
file_len,
index,
&mut target,
next_offset,
vector_size,
touched_ids,
)
}
pub(crate) fn truncate_wal(wal_path: &Path) -> io::Result<()> {
let file = OpenOptions::new().write(true).open(wal_path)?;
file.set_len(0)?;
file.sync_all()
}
fn open_crc_wal(wal_path: &Path) -> io::Result<Option<(BufReader<File>, u64)>> {
if !wal_path.exists() {
return Ok(None);
}
let file = File::open(wal_path)?;
let file_len = file.metadata()?.len();
if file_len == 0 {
return Ok(None);
}
if !is_crc_framed_wal(wal_path, file_len)? {
return Ok(None);
}
let reader = BufReader::new(File::open(wal_path)?);
Ok(Some((reader, file_len)))
}
#[allow(clippy::too_many_arguments)] fn drain_wal_entries(
reader: &mut BufReader<File>,
file_len: u64,
index: &ShardedIndex,
target: &mut ReplayTarget<'_>,
next_offset: &mut usize,
vector_size: usize,
touched_ids: &mut Vec<u64>,
) -> io::Result<usize> {
let mut replayed = 0usize;
while replay_one_entry(
reader,
file_len,
index,
target,
next_offset,
vector_size,
touched_ids,
)?
.should_continue()
{
replayed += 1;
}
Ok(replayed)
}
fn is_crc_framed_wal(wal_path: &Path, file_len: u64) -> io::Result<bool> {
let min_size = MIN_STORE_ENTRY.min(DELETE_ENTRY_SIZE);
if file_len < min_size {
return Ok(false);
}
let mut reader = BufReader::new(File::open(wal_path)?);
let mut op = [0u8; 1];
loop {
if reader.read_exact(&mut op).is_err() {
return Ok(false);
}
if op[0] != 4 {
break;
}
}
match op[0] {
1 => validate_first_store_crc(&mut reader, file_len),
2 => Ok(validate_first_delete_crc(&mut reader)),
_ => Ok(false),
}
}
fn validate_first_store_crc(reader: &mut BufReader<File>, file_len: u64) -> io::Result<bool> {
let mut id_bytes = [0u8; 8];
let mut len_bytes = [0u8; 4];
if reader.read_exact(&mut id_bytes).is_err() || reader.read_exact(&mut len_bytes).is_err() {
return Ok(false);
}
let Some(data_len) = checked_store_data_len(len_bytes, reader, file_len)? else {
return Ok(false);
};
let mut data = vec![0u8; data_len];
if reader.read_exact(&mut data).is_err() {
return Ok(false);
}
let mut stored_crc = [0u8; 4];
if reader.read_exact(&mut stored_crc).is_err() {
return Ok(false);
}
Ok(store_crc_matches(id_bytes, len_bytes, &data, stored_crc))
}
fn validate_first_delete_crc(reader: &mut BufReader<File>) -> bool {
let mut id_bytes = [0u8; 8];
if reader.read_exact(&mut id_bytes).is_err() {
return false;
}
let mut stored_crc = [0u8; 4];
if reader.read_exact(&mut stored_crc).is_err() {
return false;
}
let mut frame = Vec::with_capacity(1 + 8);
frame.push(2u8);
frame.extend_from_slice(&id_bytes);
crc32_hash(&frame) == u32::from_le_bytes(stored_crc)
}
#[allow(clippy::too_many_arguments)] fn replay_one_entry(
reader: &mut BufReader<File>,
file_len: u64,
index: &ShardedIndex,
target: &mut ReplayTarget<'_>,
next_offset: &mut usize,
vector_size: usize,
touched_ids: &mut Vec<u64>,
) -> io::Result<EntryOutcome> {
let pos = reader.stream_position()?;
if pos >= file_len {
return Ok(EntryOutcome::Stop);
}
let mut op = [0u8; 1];
if reader.read_exact(&mut op).is_err() {
return Ok(EntryOutcome::Stop);
}
match op[0] {
1 => replay_store(
reader,
file_len,
index,
target,
next_offset,
vector_size,
touched_ids,
),
2 => replay_delete(reader, file_len, index, touched_ids),
4 => Ok(EntryOutcome::Applied),
_ => {
if reader.stream_position()? < file_len {
crate::metrics::global_guardrails_metrics().record_wal_replay_corrupt_entry();
}
Ok(EntryOutcome::Stop)
}
}
}
fn read_store_entry(
reader: &mut BufReader<File>,
file_len: u64,
) -> io::Result<Option<(u64, Vec<u8>, bool)>> {
let mut id_bytes = [0u8; 8];
let mut len_bytes = [0u8; 4];
if reader.read_exact(&mut id_bytes).is_err() || reader.read_exact(&mut len_bytes).is_err() {
return Ok(None);
}
let Some(data_len) = checked_store_data_len(len_bytes, reader, file_len)? else {
return Ok(None);
};
let mut data = vec![0u8; data_len];
if reader.read_exact(&mut data).is_err() {
return Ok(None);
}
let mut stored_crc = [0u8; 4];
if reader.read_exact(&mut stored_crc).is_err() {
return Ok(None);
}
let crc_ok = store_crc_matches(id_bytes, len_bytes, &data, stored_crc);
Ok(Some((u64::from_le_bytes(id_bytes), data, crc_ok)))
}
fn checked_store_data_len(
len_bytes: [u8; 4],
reader: &mut BufReader<File>,
file_len: u64,
) -> io::Result<Option<usize>> {
let data_len = u64::from(u32::from_le_bytes(len_bytes));
let pos = reader.stream_position()?;
let remaining = file_len.saturating_sub(pos);
if data_len.saturating_add(4) > remaining {
return Ok(None);
}
let data_len = usize::try_from(data_len)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "data_len overflow"))?;
Ok(Some(data_len))
}
fn is_at_tail(reader: &mut BufReader<File>, file_len: u64) -> io::Result<bool> {
Ok(reader.stream_position()? >= file_len)
}
fn store_crc_matches(
id_bytes: [u8; 8],
len_bytes: [u8; 4],
data: &[u8],
stored_crc: [u8; 4],
) -> bool {
let mut frame = Vec::with_capacity(1 + 8 + 4 + data.len());
frame.push(1u8);
frame.extend_from_slice(&id_bytes);
frame.extend_from_slice(&len_bytes);
frame.extend_from_slice(data);
crc32_hash(&frame) == u32::from_le_bytes(stored_crc)
}
#[allow(clippy::too_many_arguments)] fn replay_store(
reader: &mut BufReader<File>,
file_len: u64,
index: &ShardedIndex,
target: &mut ReplayTarget<'_>,
next_offset: &mut usize,
vector_size: usize,
touched_ids: &mut Vec<u64>,
) -> io::Result<EntryOutcome> {
let Some((id, data, crc_ok)) = read_store_entry(reader, file_len)? else {
return Ok(EntryOutcome::Stop);
};
if !crc_ok {
if is_at_tail(reader, file_len)? {
return Ok(EntryOutcome::Stop);
}
crate::metrics::global_guardrails_metrics().record_wal_replay_corrupt_entry();
tracing::warn!(id, "WAL replay: skipping mid-stream corrupt store entry");
return Ok(EntryOutcome::Applied);
}
if data.len() == vector_size {
apply_store_to_mmap(id, &data, index, target, next_offset, vector_size)?;
touched_ids.push(id);
}
Ok(EntryOutcome::Applied)
}
fn apply_store_to_mmap(
id: u64,
data: &[u8],
index: &ShardedIndex,
target: &mut ReplayTarget<'_>,
next_offset: &mut usize,
vector_size: usize,
) -> io::Result<()> {
let offset = index.get(id).unwrap_or(*next_offset);
let end = offset
.checked_add(vector_size)
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "WAL replay offset overflow"))?;
target.ensure_capacity(end)?;
target.mmap[offset..end].copy_from_slice(data);
index.insert(id, offset);
if offset == *next_offset {
*next_offset = end;
}
Ok(())
}
fn replay_delete(
reader: &mut BufReader<File>,
file_len: u64,
index: &ShardedIndex,
touched_ids: &mut Vec<u64>,
) -> io::Result<EntryOutcome> {
let pos = reader.stream_position()?;
if file_len.saturating_sub(pos) < 8 + 4 {
return Ok(EntryOutcome::Stop);
}
let mut id_bytes = [0u8; 8];
if reader.read_exact(&mut id_bytes).is_err() {
return Ok(EntryOutcome::Stop);
}
let mut stored_crc = [0u8; 4];
if reader.read_exact(&mut stored_crc).is_err() {
return Ok(EntryOutcome::Stop);
}
let mut frame = Vec::with_capacity(1 + 8);
frame.push(2u8);
frame.extend_from_slice(&id_bytes);
if crc32_hash(&frame) == u32::from_le_bytes(stored_crc) {
let id = u64::from_le_bytes(id_bytes);
index.remove(id);
touched_ids.push(id);
return Ok(EntryOutcome::Applied);
}
if is_at_tail(reader, file_len)? {
return Ok(EntryOutcome::Stop);
}
crate::metrics::global_guardrails_metrics().record_wal_replay_corrupt_entry();
tracing::warn!("WAL replay: skipping mid-stream corrupt delete entry");
Ok(EntryOutcome::Applied)
}