use crate::storage::log_payload::crc32_hash;
use crate::storage::sharded_index::ShardedIndex;
use std::fs::{File, OpenOptions};
use std::io::{self, BufReader, Read, Seek};
use std::path::Path;
const MIN_STORE_ENTRY: usize = 17;
const DELETE_ENTRY_SIZE: usize = 13;
#[allow(clippy::module_name_repetitions)]
pub(crate) fn replay_wal_to_index(
wal_path: &Path,
index: &ShardedIndex,
dimension: usize,
mmap_data: &mut [u8],
next_offset: &mut usize,
) -> io::Result<usize> {
let Some((mut reader, file_len)) = open_crc_wal(wal_path)? else {
return Ok(0);
};
let vector_size = dimension * std::mem::size_of::<f32>();
let replayed = drain_wal_entries(
&mut reader,
file_len,
index,
mmap_data,
next_offset,
vector_size,
);
if replayed > 0 {
truncate_wal(wal_path)?;
}
Ok(replayed)
}
fn open_crc_wal(wal_path: &Path) -> io::Result<Option<(BufReader<File>, u64)>> {
if !wal_path.exists() {
return Ok(None);
}
let file = File::open(wal_path)?;
let file_len = file.metadata()?.len();
if file_len == 0 {
return Ok(None);
}
if !is_crc_framed_wal(wal_path, file_len)? {
return Ok(None);
}
let reader = BufReader::new(File::open(wal_path)?);
Ok(Some((reader, file_len)))
}
fn drain_wal_entries(
reader: &mut BufReader<File>,
file_len: u64,
index: &ShardedIndex,
mmap_data: &mut [u8],
next_offset: &mut usize,
vector_size: usize,
) -> usize {
let mut replayed = 0usize;
while let Ok(true) =
replay_one_entry(reader, file_len, index, mmap_data, next_offset, vector_size)
{
replayed += 1;
}
replayed
}
fn is_crc_framed_wal(wal_path: &Path, file_len: u64) -> io::Result<bool> {
let min_size = MIN_STORE_ENTRY.min(DELETE_ENTRY_SIZE) as u64;
if file_len < min_size {
return Ok(false);
}
let mut reader = BufReader::new(File::open(wal_path)?);
let mut op = [0u8; 1];
if reader.read_exact(&mut op).is_err() {
return Ok(false);
}
match op[0] {
1 => validate_first_store_crc(&mut reader),
2 => Ok(validate_first_delete_crc(&mut reader)),
_ => Ok(false),
}
}
fn validate_first_store_crc(reader: &mut BufReader<File>) -> io::Result<bool> {
let mut id_bytes = [0u8; 8];
let mut len_bytes = [0u8; 4];
if reader.read_exact(&mut id_bytes).is_err() || reader.read_exact(&mut len_bytes).is_err() {
return Ok(false);
}
let data_len = usize::try_from(u32::from_le_bytes(len_bytes))
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "data_len overflow"))?;
let mut data = vec![0u8; data_len];
if reader.read_exact(&mut data).is_err() {
return Ok(false);
}
let mut stored_crc = [0u8; 4];
if reader.read_exact(&mut stored_crc).is_err() {
return Ok(false);
}
let mut frame = Vec::with_capacity(1 + 8 + 4 + data_len);
frame.push(1u8);
frame.extend_from_slice(&id_bytes);
frame.extend_from_slice(&len_bytes);
frame.extend_from_slice(&data);
Ok(crc32_hash(&frame) == u32::from_le_bytes(stored_crc))
}
fn validate_first_delete_crc(reader: &mut BufReader<File>) -> bool {
let mut id_bytes = [0u8; 8];
if reader.read_exact(&mut id_bytes).is_err() {
return false;
}
let mut stored_crc = [0u8; 4];
if reader.read_exact(&mut stored_crc).is_err() {
return false;
}
let mut frame = Vec::with_capacity(1 + 8);
frame.push(2u8);
frame.extend_from_slice(&id_bytes);
crc32_hash(&frame) == u32::from_le_bytes(stored_crc)
}
fn replay_one_entry(
reader: &mut BufReader<File>,
file_len: u64,
index: &ShardedIndex,
mmap_data: &mut [u8],
next_offset: &mut usize,
vector_size: usize,
) -> io::Result<bool> {
if reader.stream_position()? >= file_len {
return Ok(false);
}
let mut op = [0u8; 1];
if reader.read_exact(&mut op).is_err() {
return Ok(false);
}
match op[0] {
1 => replay_store(reader, index, mmap_data, next_offset, vector_size),
2 => replay_delete(reader, index),
_ => Err(io::Error::new(io::ErrorKind::InvalidData, "unknown WAL op")),
}
}
fn read_store_entry(reader: &mut BufReader<File>) -> io::Result<(u64, Vec<u8>)> {
let mut id_bytes = [0u8; 8];
let mut len_bytes = [0u8; 4];
reader.read_exact(&mut id_bytes)?;
reader.read_exact(&mut len_bytes)?;
let data_len = usize::try_from(u32::from_le_bytes(len_bytes))
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "data_len overflow"))?;
let mut data = vec![0u8; data_len];
reader.read_exact(&mut data)?;
let mut stored_crc = [0u8; 4];
reader.read_exact(&mut stored_crc)?;
verify_store_crc(id_bytes, len_bytes, &data, stored_crc)?;
Ok((u64::from_le_bytes(id_bytes), data))
}
fn verify_store_crc(
id_bytes: [u8; 8],
len_bytes: [u8; 4],
data: &[u8],
stored_crc: [u8; 4],
) -> io::Result<()> {
let mut frame = Vec::with_capacity(1 + 8 + 4 + data.len());
frame.push(1u8);
frame.extend_from_slice(&id_bytes);
frame.extend_from_slice(&len_bytes);
frame.extend_from_slice(data);
if crc32_hash(&frame) == u32::from_le_bytes(stored_crc) {
Ok(())
} else {
Err(io::Error::new(io::ErrorKind::InvalidData, "CRC mismatch"))
}
}
fn replay_store(
reader: &mut BufReader<File>,
index: &ShardedIndex,
mmap_data: &mut [u8],
next_offset: &mut usize,
vector_size: usize,
) -> io::Result<bool> {
let (id, data) = read_store_entry(reader)?;
if data.len() == vector_size {
apply_store_to_mmap(id, &data, index, mmap_data, next_offset, vector_size);
}
Ok(true)
}
fn apply_store_to_mmap(
id: u64,
data: &[u8],
index: &ShardedIndex,
mmap_data: &mut [u8],
next_offset: &mut usize,
vector_size: usize,
) {
let offset = index.get(id).unwrap_or_else(|| {
let off = *next_offset;
*next_offset += vector_size;
off
});
let end = offset + vector_size;
if end <= mmap_data.len() {
mmap_data[offset..end].copy_from_slice(data);
index.insert(id, offset);
}
}
fn replay_delete(reader: &mut BufReader<File>, index: &ShardedIndex) -> io::Result<bool> {
let mut id_bytes = [0u8; 8];
reader.read_exact(&mut id_bytes)?;
let mut stored_crc = [0u8; 4];
reader.read_exact(&mut stored_crc)?;
let mut frame = Vec::with_capacity(1 + 8);
frame.push(2u8);
frame.extend_from_slice(&id_bytes);
if crc32_hash(&frame) != u32::from_le_bytes(stored_crc) {
return Err(io::Error::new(io::ErrorKind::InvalidData, "CRC mismatch"));
}
let id = u64::from_le_bytes(id_bytes);
index.remove(id);
Ok(true)
}
fn truncate_wal(wal_path: &Path) -> io::Result<()> {
let file = OpenOptions::new().write(true).open(wal_path)?;
file.set_len(0)?;
file.sync_all()
}