use super::log_payload_io::{compute_delete_crc, write_store_record, CRC_DELETE_MARKER};
use super::snapshot;
use super::traits::PayloadStorage;
#[allow(unused_imports)] pub(crate) use snapshot::{crc32_hash, SNAPSHOT_MAGIC, SNAPSHOT_VERSION};
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use std::fs::{File, OpenOptions};
use std::io::{self, BufReader, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[non_exhaustive]
pub enum DurabilityMode {
#[default]
Fsync,
FlushOnly,
None,
}
#[allow(clippy::module_name_repetitions)]
pub struct LogPayloadStorage {
path: PathBuf,
index: RwLock<FxHashMap<u64, u64>>,
wal: RwLock<io::BufWriter<File>>,
reader: RwLock<File>,
last_snapshot_wal_pos: RwLock<u64>,
durability: DurabilityMode,
write_offset: RwLock<u64>,
}
use super::wal_entry::WalEntry;
impl LogPayloadStorage {
pub fn new<P: AsRef<Path>>(path: P) -> io::Result<Self> {
Self::new_with_durability(path, DurabilityMode::default())
}
pub fn new_with_durability<P: AsRef<Path>>(
path: P,
durability: DurabilityMode,
) -> io::Result<Self> {
let path = path.as_ref().to_path_buf();
std::fs::create_dir_all(&path)?;
let log_path = path.join("payloads.log");
let wal = Self::open_wal_writer(&log_path)?;
let (reader, wal_len) = Self::open_wal_reader(&log_path)?;
let (index, last_snapshot_wal_pos) = Self::load_or_replay_index(&path, &log_path, wal_len)?;
Ok(Self {
path,
index: RwLock::new(index),
wal: RwLock::new(wal),
reader: RwLock::new(reader),
last_snapshot_wal_pos: RwLock::new(last_snapshot_wal_pos),
durability,
write_offset: RwLock::new(wal_len),
})
}
fn open_wal_writer(log_path: &Path) -> io::Result<io::BufWriter<File>> {
let writer_file = OpenOptions::new()
.create(true)
.append(true)
.open(log_path)?;
Ok(io::BufWriter::new(writer_file))
}
fn open_wal_reader(log_path: &Path) -> io::Result<(File, u64)> {
if !log_path.exists() {
File::create(log_path)?;
}
let reader = File::open(log_path)?;
let wal_len = reader.metadata()?.len();
Ok((reader, wal_len))
}
fn load_or_replay_index(
dir: &Path,
log_path: &Path,
wal_len: u64,
) -> io::Result<(FxHashMap<u64, u64>, u64)> {
let snapshot_path = dir.join("payloads.snapshot");
if let Ok((snapshot_index, snapshot_wal_pos)) = snapshot::load_snapshot(&snapshot_path) {
let index = Self::replay_wal_from(log_path, snapshot_index, snapshot_wal_pos, wal_len)?;
Ok((index, snapshot_wal_pos))
} else {
let index = Self::replay_wal_from(log_path, FxHashMap::default(), 0, wal_len)?;
Ok((index, 0))
}
}
fn sync_wal(wal: &mut io::BufWriter<File>, mode: DurabilityMode) -> io::Result<()> {
match mode {
DurabilityMode::Fsync => {
wal.flush()?;
wal.get_ref().sync_all()?;
}
DurabilityMode::FlushOnly => {
wal.flush()?;
}
DurabilityMode::None => {}
}
Ok(())
}
fn sync_wal_or_resync(
wal: &mut io::BufWriter<File>,
mode: DurabilityMode,
offset: &mut u64,
) -> io::Result<()> {
if let Err(e) = Self::sync_wal(wal, mode) {
if let Ok(meta) = wal.get_ref().metadata() {
*offset = meta.len();
}
return Err(e);
}
Ok(())
}
fn replay_wal_from(
log_path: &Path,
mut index: FxHashMap<u64, u64>,
start_pos: u64,
end_pos: u64,
) -> io::Result<FxHashMap<u64, u64>> {
if start_pos >= end_pos {
return Ok(index);
}
let file = File::open(log_path)?;
let mut reader_buf = BufReader::new(file);
reader_buf.seek(SeekFrom::Start(start_pos))?;
let mut pos = start_pos;
while pos < end_pos {
let Some(entry) = WalEntry::read(&mut reader_buf, pos)? else {
break;
};
pos = entry.apply(&mut index, &mut reader_buf)?;
}
Ok(index)
}
pub fn create_snapshot(&mut self) -> io::Result<()> {
{
let mut wal = self.wal.write();
wal.flush()?;
wal.get_ref().sync_all()?;
}
let index = self.index.read();
let wal_pos = *self.write_offset.read();
snapshot::create_snapshot_file(&self.path, &index, wal_pos)?;
*self.last_snapshot_wal_pos.write() = wal_pos;
Ok(())
}
#[must_use]
pub fn should_create_snapshot(&self) -> bool {
snapshot::should_create_snapshot(
*self.last_snapshot_wal_pos.read(),
*self.write_offset.read(),
)
}
fn maybe_auto_snapshot(&mut self) {
if self.should_create_snapshot() {
if let Err(e) = self.create_snapshot() {
tracing::warn!(
error = %e,
"Auto-snapshot after WAL growth failed; will retry on next write"
);
}
}
}
pub fn store_batch(&mut self, entries: &[(u64, &serde_json::Value)]) -> io::Result<()> {
self.store_batch_inner(entries, true)
}
pub fn store_batch_deferred(
&mut self,
entries: &[(u64, &serde_json::Value)],
) -> io::Result<()> {
self.store_batch_inner(entries, false)
}
fn store_batch_inner(
&mut self,
entries: &[(u64, &serde_json::Value)],
fsync: bool,
) -> io::Result<()> {
if entries.is_empty() {
return Ok(());
}
{
let mut wal = self.wal.write();
let mut index = self.index.write();
let mut offset = self.write_offset.write();
let mut record_buf = Vec::with_capacity(256);
for &(id, payload) in entries {
write_store_record(
&mut wal,
id,
payload,
&mut offset,
&mut index,
&mut record_buf,
)?;
}
if fsync {
Self::sync_wal_or_resync(&mut wal, self.durability, &mut offset)?;
} else {
wal.flush()?;
}
}
self.maybe_auto_snapshot();
Ok(())
}
}
impl PayloadStorage for LogPayloadStorage {
fn store(&mut self, id: u64, payload: &serde_json::Value) -> io::Result<()> {
{
let mut wal = self.wal.write();
let mut index = self.index.write();
let mut offset = self.write_offset.write();
let mut record_buf = Vec::new();
write_store_record(
&mut wal,
id,
payload,
&mut offset,
&mut index,
&mut record_buf,
)?;
Self::sync_wal_or_resync(&mut wal, self.durability, &mut offset)?;
}
self.maybe_auto_snapshot();
Ok(())
}
fn retrieve(&self, id: u64) -> io::Result<Option<serde_json::Value>> {
let index = self.index.read();
let Some(&offset) = index.get(&id) else {
return Ok(None);
};
drop(index);
if self.durability == DurabilityMode::None {
self.wal.write().flush()?;
}
let mut reader = self.reader.write(); reader.seek(SeekFrom::Start(offset))?;
let mut len_bytes = [0u8; 4];
reader.read_exact(&mut len_bytes)?;
let len = u32::from_le_bytes(len_bytes) as usize;
let mut payload_bytes = vec![0u8; len];
reader.read_exact(&mut payload_bytes)?;
let payload = serde_json::from_slice(&payload_bytes)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
Ok(Some(payload))
}
fn delete(&mut self, id: u64) -> io::Result<()> {
let crc = compute_delete_crc(id);
{
let mut wal = self.wal.write();
let mut index = self.index.write();
let mut offset = self.write_offset.write();
let mut record = [0u8; 1 + 8 + 4];
record[0] = CRC_DELETE_MARKER;
record[1..9].copy_from_slice(&id.to_le_bytes());
record[9..13].copy_from_slice(&crc.to_le_bytes());
wal.write_all(&record)?;
Self::sync_wal_or_resync(&mut wal, self.durability, &mut offset)?;
*offset += 1 + 8 + 4; index.remove(&id);
}
self.maybe_auto_snapshot();
Ok(())
}
fn flush(&mut self) -> io::Result<()> {
let mut wal = self.wal.write();
Self::sync_wal(&mut wal, self.durability)
}
fn ids(&self) -> Vec<u64> {
self.index.read().keys().copied().collect()
}
}