use super::snapshot;
use super::traits::PayloadStorage;
#[allow(unused_imports)] pub(crate) use snapshot::{crc32_hash, SNAPSHOT_MAGIC, SNAPSHOT_VERSION};
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use std::fs::{File, OpenOptions};
use std::io::{self, BufReader, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum DurabilityMode {
#[default]
Fsync,
FlushOnly,
None,
}
#[allow(clippy::module_name_repetitions)]
pub struct LogPayloadStorage {
path: PathBuf,
index: RwLock<FxHashMap<u64, u64>>,
wal: RwLock<io::BufWriter<File>>,
reader: RwLock<File>,
last_snapshot_wal_pos: RwLock<u64>,
durability: DurabilityMode,
write_offset: RwLock<u64>,
}
const LEGACY_STORE_MARKER: u8 = 1;
const LEGACY_DELETE_MARKER: u8 = 2;
const CRC_STORE_MARKER: u8 = 0xC3;
const CRC_DELETE_MARKER: u8 = 0xC4;
fn compute_store_crc(id: u64, payload: &[u8]) -> u32 {
#[allow(clippy::cast_possible_truncation)]
let len_u32 = payload.len() as u32;
let mut buf = Vec::with_capacity(1 + 8 + 4 + payload.len());
buf.push(CRC_STORE_MARKER);
buf.extend_from_slice(&id.to_le_bytes());
buf.extend_from_slice(&len_u32.to_le_bytes());
buf.extend_from_slice(payload);
crc32_hash(&buf)
}
fn write_store_record(
wal: &mut io::BufWriter<File>,
id: u64,
payload: &serde_json::Value,
offset: &mut u64,
index: &mut FxHashMap<u64, u64>,
record_buf: &mut Vec<u8>,
) -> io::Result<()> {
let record_start = *offset;
record_buf.clear();
record_buf.push(CRC_STORE_MARKER);
record_buf.extend_from_slice(&id.to_le_bytes());
let len_pos = record_buf.len();
record_buf.extend_from_slice(&0u32.to_le_bytes());
let payload_start = record_buf.len();
serde_json::to_writer(&mut *record_buf, payload)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let payload_len = record_buf.len() - payload_start;
let len_u32 = u32::try_from(payload_len)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "Payload too large"))?;
record_buf[len_pos..len_pos + 4].copy_from_slice(&len_u32.to_le_bytes());
let crc = crc32_hash(record_buf);
record_buf.extend_from_slice(&crc.to_le_bytes());
wal.write_all(record_buf)?;
let bytes_written = 1 + 8 + 4 + u64::from(len_u32) + 4;
*offset += bytes_written;
index.insert(id, record_start + 9);
Ok(())
}
fn compute_delete_crc(id: u64) -> u32 {
let mut buf = [0u8; 1 + 8];
buf[0] = CRC_DELETE_MARKER;
buf[1..9].copy_from_slice(&id.to_le_bytes());
crc32_hash(&buf)
}
struct WalEntry {
op: WalOp,
pos_after_header: u64,
has_crc: bool,
}
enum WalOp {
Store { id: u64 },
Delete { id: u64 },
}
impl WalEntry {
fn read(reader: &mut BufReader<File>, pos: u64) -> io::Result<Option<Self>> {
let mut marker = [0u8; 1];
if reader.read_exact(&mut marker).is_err() {
return Ok(None); }
let mut id_bytes = [0u8; 8];
reader.read_exact(&mut id_bytes)?;
let id = u64::from_le_bytes(id_bytes);
let pos_after_header = pos + 1 + 8;
let (op, has_crc) = match marker[0] {
LEGACY_STORE_MARKER => (WalOp::Store { id }, false),
LEGACY_DELETE_MARKER => (WalOp::Delete { id }, false),
CRC_STORE_MARKER => (WalOp::Store { id }, true),
CRC_DELETE_MARKER => (WalOp::Delete { id }, true),
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Unknown WAL marker",
))
}
};
Ok(Some(Self {
op,
pos_after_header,
has_crc,
}))
}
fn apply(
self,
index: &mut FxHashMap<u64, u64>,
reader: &mut BufReader<File>,
) -> io::Result<u64> {
match self.op {
WalOp::Store { id } => self.apply_store(id, index, reader),
WalOp::Delete { id } => self.apply_delete(id, index, reader),
}
}
fn apply_store(
&self,
id: u64,
index: &mut FxHashMap<u64, u64>,
reader: &mut BufReader<File>,
) -> io::Result<u64> {
let len_offset = self.pos_after_header;
let mut len_bytes = [0u8; 4];
reader.read_exact(&mut len_bytes)?;
let payload_len = u64::from(u32::from_le_bytes(len_bytes));
let end_pos = if self.has_crc {
self.apply_store_with_crc(id, payload_len, index, reader, len_offset)?
} else {
let skip = i64::try_from(payload_len)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Payload too large"))?;
reader.seek(SeekFrom::Current(skip))?;
index.insert(id, len_offset);
self.pos_after_header + 4 + payload_len
};
Ok(end_pos)
}
fn apply_store_with_crc(
&self,
id: u64,
payload_len: u64,
index: &mut FxHashMap<u64, u64>,
reader: &mut BufReader<File>,
len_offset: u64,
) -> io::Result<u64> {
let payload_usize = usize::try_from(payload_len)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Payload too large"))?;
let mut payload_buf = vec![0u8; payload_usize];
reader.read_exact(&mut payload_buf)?;
let mut crc_bytes = [0u8; 4];
reader.read_exact(&mut crc_bytes)?;
let stored_crc = u32::from_le_bytes(crc_bytes);
let computed_crc = compute_store_crc(id, &payload_buf);
if stored_crc == computed_crc {
index.insert(id, len_offset);
} else {
tracing::warn!(
id,
"WAL CRC mismatch on store entry — skipping corrupted entry"
);
}
Ok(self.pos_after_header + 4 + payload_len + 4)
}
fn apply_delete(
&self,
id: u64,
index: &mut FxHashMap<u64, u64>,
reader: &mut BufReader<File>,
) -> io::Result<u64> {
if self.has_crc {
let mut crc_bytes = [0u8; 4];
reader.read_exact(&mut crc_bytes)?;
let stored_crc = u32::from_le_bytes(crc_bytes);
let computed_crc = compute_delete_crc(id);
if stored_crc == computed_crc {
index.remove(&id);
} else {
tracing::warn!(
id,
"WAL CRC mismatch on delete entry — skipping corrupted entry"
);
}
Ok(self.pos_after_header + 4)
} else {
index.remove(&id);
Ok(self.pos_after_header)
}
}
}
impl LogPayloadStorage {
pub fn new<P: AsRef<Path>>(path: P) -> io::Result<Self> {
Self::new_with_durability(path, DurabilityMode::default())
}
pub fn new_with_durability<P: AsRef<Path>>(
path: P,
durability: DurabilityMode,
) -> io::Result<Self> {
let path = path.as_ref().to_path_buf();
std::fs::create_dir_all(&path)?;
let log_path = path.join("payloads.log");
let wal = Self::open_wal_writer(&log_path)?;
let (reader, wal_len) = Self::open_wal_reader(&log_path)?;
let (index, last_snapshot_wal_pos) = Self::load_or_replay_index(&path, &log_path, wal_len)?;
Ok(Self {
path,
index: RwLock::new(index),
wal: RwLock::new(wal),
reader: RwLock::new(reader),
last_snapshot_wal_pos: RwLock::new(last_snapshot_wal_pos),
durability,
write_offset: RwLock::new(wal_len),
})
}
fn open_wal_writer(log_path: &Path) -> io::Result<io::BufWriter<File>> {
let writer_file = OpenOptions::new()
.create(true)
.append(true)
.open(log_path)?;
Ok(io::BufWriter::new(writer_file))
}
fn open_wal_reader(log_path: &Path) -> io::Result<(File, u64)> {
if !log_path.exists() {
File::create(log_path)?;
}
let reader = File::open(log_path)?;
let wal_len = reader.metadata()?.len();
Ok((reader, wal_len))
}
fn load_or_replay_index(
dir: &Path,
log_path: &Path,
wal_len: u64,
) -> io::Result<(FxHashMap<u64, u64>, u64)> {
let snapshot_path = dir.join("payloads.snapshot");
if let Ok((snapshot_index, snapshot_wal_pos)) = snapshot::load_snapshot(&snapshot_path) {
let index = Self::replay_wal_from(log_path, snapshot_index, snapshot_wal_pos, wal_len)?;
Ok((index, snapshot_wal_pos))
} else {
let index = Self::replay_wal_from(log_path, FxHashMap::default(), 0, wal_len)?;
Ok((index, 0))
}
}
fn sync_wal(wal: &mut io::BufWriter<File>, mode: DurabilityMode) -> io::Result<()> {
match mode {
DurabilityMode::Fsync => {
wal.flush()?;
wal.get_ref().sync_all()?;
}
DurabilityMode::FlushOnly => {
wal.flush()?;
}
DurabilityMode::None => {}
}
Ok(())
}
fn sync_wal_or_resync(
wal: &mut io::BufWriter<File>,
mode: DurabilityMode,
offset: &mut u64,
) -> io::Result<()> {
if let Err(e) = Self::sync_wal(wal, mode) {
if let Ok(meta) = wal.get_ref().metadata() {
*offset = meta.len();
}
return Err(e);
}
Ok(())
}
fn replay_wal_from(
log_path: &Path,
mut index: FxHashMap<u64, u64>,
start_pos: u64,
end_pos: u64,
) -> io::Result<FxHashMap<u64, u64>> {
if start_pos >= end_pos {
return Ok(index);
}
let file = File::open(log_path)?;
let mut reader_buf = BufReader::new(file);
reader_buf.seek(SeekFrom::Start(start_pos))?;
let mut pos = start_pos;
while pos < end_pos {
let Some(entry) = WalEntry::read(&mut reader_buf, pos)? else {
break;
};
pos = entry.apply(&mut index, &mut reader_buf)?;
}
Ok(index)
}
pub fn create_snapshot(&mut self) -> io::Result<()> {
{
let mut wal = self.wal.write();
wal.flush()?;
wal.get_ref().sync_all()?;
}
let index = self.index.read();
let wal_pos = *self.write_offset.read();
snapshot::create_snapshot_file(&self.path, &index, wal_pos)?;
*self.last_snapshot_wal_pos.write() = wal_pos;
Ok(())
}
#[must_use]
pub fn should_create_snapshot(&self) -> bool {
snapshot::should_create_snapshot(
*self.last_snapshot_wal_pos.read(),
*self.write_offset.read(),
)
}
fn maybe_auto_snapshot(&mut self) {
if self.should_create_snapshot() {
if let Err(e) = self.create_snapshot() {
tracing::warn!(
error = %e,
"Auto-snapshot after WAL growth failed; will retry on next write"
);
}
}
}
pub fn store_batch(&mut self, entries: &[(u64, &serde_json::Value)]) -> io::Result<()> {
if entries.is_empty() {
return Ok(());
}
{
let mut wal = self.wal.write();
let mut index = self.index.write();
let mut offset = self.write_offset.write();
let mut record_buf = Vec::with_capacity(256);
for &(id, payload) in entries {
write_store_record(
&mut wal,
id,
payload,
&mut offset,
&mut index,
&mut record_buf,
)?;
}
Self::sync_wal_or_resync(&mut wal, self.durability, &mut offset)?;
}
self.maybe_auto_snapshot();
Ok(())
}
}
impl PayloadStorage for LogPayloadStorage {
fn store(&mut self, id: u64, payload: &serde_json::Value) -> io::Result<()> {
{
let mut wal = self.wal.write();
let mut index = self.index.write();
let mut offset = self.write_offset.write();
let mut record_buf = Vec::new();
write_store_record(
&mut wal,
id,
payload,
&mut offset,
&mut index,
&mut record_buf,
)?;
Self::sync_wal_or_resync(&mut wal, self.durability, &mut offset)?;
}
self.maybe_auto_snapshot();
Ok(())
}
fn retrieve(&self, id: u64) -> io::Result<Option<serde_json::Value>> {
let index = self.index.read();
let Some(&offset) = index.get(&id) else {
return Ok(None);
};
drop(index);
if self.durability == DurabilityMode::None {
self.wal.write().flush()?;
}
let mut reader = self.reader.write(); reader.seek(SeekFrom::Start(offset))?;
let mut len_bytes = [0u8; 4];
reader.read_exact(&mut len_bytes)?;
let len = u32::from_le_bytes(len_bytes) as usize;
let mut payload_bytes = vec![0u8; len];
reader.read_exact(&mut payload_bytes)?;
let payload = serde_json::from_slice(&payload_bytes)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
Ok(Some(payload))
}
fn delete(&mut self, id: u64) -> io::Result<()> {
let crc = compute_delete_crc(id);
{
let mut wal = self.wal.write();
let mut index = self.index.write();
let mut offset = self.write_offset.write();
let mut record = [0u8; 1 + 8 + 4];
record[0] = CRC_DELETE_MARKER;
record[1..9].copy_from_slice(&id.to_le_bytes());
record[9..13].copy_from_slice(&crc.to_le_bytes());
wal.write_all(&record)?;
Self::sync_wal_or_resync(&mut wal, self.durability, &mut offset)?;
*offset += 1 + 8 + 4; index.remove(&id);
}
self.maybe_auto_snapshot();
Ok(())
}
fn flush(&mut self) -> io::Result<()> {
let mut wal = self.wal.write();
Self::sync_wal(&mut wal, self.durability)
}
fn ids(&self) -> Vec<u64> {
self.index.read().keys().copied().collect()
}
}