use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use crate::storage::codec::{decode_op, encode_op};
use crate::storage::{Op, OpRef};
use crate::{Error, Result};
const WAL_BUFFER_BYTES: usize = 64 * 1024;
#[derive(Debug)]
struct OpenBatch {
tx_id: u64,
expected_count: u32,
buffered_ops: Vec<Op>,
start_cursor: usize,
}
#[derive(Debug)]
pub(crate) struct Wal {
writer: BufWriter<File>,
path: PathBuf,
scratch: Vec<u8>,
pending: u64,
}
impl Wal {
pub(crate) fn open(path: impl Into<PathBuf>) -> Result<Self> {
let path = path.into();
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&path)?;
let _offset = file.seek(SeekFrom::End(0))?;
let writer = BufWriter::with_capacity(WAL_BUFFER_BYTES, file);
Ok(Self {
writer,
path,
scratch: Vec::with_capacity(256),
pending: 0,
})
}
pub(crate) fn path_for(page_path: &Path) -> PathBuf {
let mut wal_name = page_path
.file_name()
.and_then(|name| name.to_str())
.map_or_else(|| String::from("emdb.wal"), |name| format!("{name}.wal"));
if wal_name.is_empty() {
wal_name = String::from("emdb.wal");
}
let mut wal_path = page_path.to_path_buf();
wal_path.set_file_name(wal_name);
wal_path
}
pub(crate) fn append(&mut self, op: OpRef<'_>) -> Result<()> {
self.scratch.clear();
encode_op(&mut self.scratch, op);
self.writer.write_all(&self.scratch)?;
self.pending = self.pending.saturating_add(self.scratch.len() as u64);
Ok(())
}
pub(crate) fn flush(&mut self) -> Result<()> {
self.writer.flush()?;
self.writer.get_mut().sync_data()?;
self.pending = 0;
Ok(())
}
pub(crate) fn is_empty(&self) -> Result<bool> {
if self.pending != 0 {
return Ok(false);
}
Ok(self.writer.get_ref().metadata()?.len() == 0)
}
pub(crate) fn replay(&mut self, sink: &mut dyn FnMut(Op) -> Result<()>) -> Result<()> {
self.replay_impl(sink)
}
pub(crate) fn apply_and_truncate(
&mut self,
sink: &mut dyn FnMut(Op) -> Result<()>,
) -> Result<()> {
self.replay_impl(sink)?;
self.truncate()
}
pub(crate) fn truncate(&mut self) -> Result<()> {
self.writer.flush()?;
let file = self.writer.get_mut();
file.set_len(0)?;
let _offset = file.seek(SeekFrom::Start(0))?;
self.pending = 0;
self.flush()
}
#[must_use]
pub(crate) fn path(&self) -> &Path {
&self.path
}
fn replay_impl(&mut self, sink: &mut dyn FnMut(Op) -> Result<()>) -> Result<()> {
self.writer.flush()?;
let file = self.writer.get_mut();
let _offset = file.seek(SeekFrom::Start(0))?;
let mut bytes = Vec::new();
let _read = file.read_to_end(&mut bytes)?;
let mut cursor = 0_usize;
let mut open_batch: Option<OpenBatch> = None;
while cursor < bytes.len() {
let record_start = cursor;
match decode_op(&bytes[cursor..]) {
Ok((op, consumed)) => {
cursor = cursor.checked_add(consumed).ok_or(Error::Corrupted {
offset: cursor as u64,
reason: "cursor overflow",
})?;
match op {
Op::BatchBegin { tx_id, op_count } => {
if let Some(existing) = &open_batch {
self.truncate_to(existing.start_cursor)?;
return Ok(());
}
open_batch = Some(OpenBatch {
tx_id,
expected_count: op_count,
buffered_ops: Vec::new(),
start_cursor: record_start,
});
}
Op::BatchEnd { tx_id } => {
let Some(batch) = open_batch.take() else {
self.truncate_to(record_start)?;
return Ok(());
};
if batch.tx_id != tx_id {
self.truncate_to(batch.start_cursor)?;
return Ok(());
}
if batch.buffered_ops.len() != batch.expected_count as usize {
self.truncate_to(batch.start_cursor)?;
return Ok(());
}
for buffered in batch.buffered_ops {
sink(buffered)?;
}
}
other => {
if let Some(batch) = &mut open_batch {
batch.buffered_ops.push(other);
} else {
sink(other)?;
}
}
}
}
Err(Error::Corrupted { .. }) => {
let truncate_cursor = open_batch
.as_ref()
.map_or(cursor, |batch| batch.start_cursor);
self.truncate_to(truncate_cursor)?;
return Ok(());
}
Err(other) => return Err(other),
}
}
if let Some(batch) = open_batch {
self.truncate_to(batch.start_cursor)?;
return Ok(());
}
let _offset = self.writer.get_mut().seek(SeekFrom::End(0))?;
Ok(())
}
fn truncate_to(&mut self, cursor: usize) -> Result<()> {
let file = self.writer.get_mut();
file.set_len(cursor as u64)?;
let _offset = file.seek(SeekFrom::Start(cursor as u64))?;
self.pending = 0;
self.flush()
}
}
#[cfg(test)]
mod tests {
use super::Wal;
use crate::storage::codec::encode_op;
use crate::storage::OpRef;
fn test_path(name: &str) -> std::path::PathBuf {
let mut path = std::env::temp_dir();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0_u128, |duration| duration.as_nanos());
path.push(format!("emdb-wal-{name}-{nanos}.emdb.wal"));
path
}
#[test]
fn test_append_and_replay_round_trip() {
let path = test_path("replay");
let wal = Wal::open(&path);
assert!(wal.is_ok());
let mut wal = match wal {
Ok(wal) => wal,
Err(err) => panic!("wal open should succeed: {err}"),
};
assert!(wal
.append(OpRef::Insert {
key: b"a",
value: b"1",
expires_at: None,
})
.is_ok());
assert!(wal.append(OpRef::Remove { key: b"a" }).is_ok());
assert!(wal.flush().is_ok());
let mut seen = Vec::new();
let replayed = wal.replay(&mut |op| {
seen.push(op);
Ok(())
});
assert!(replayed.is_ok());
assert_eq!(seen.len(), 2);
let _removed = std::fs::remove_file(path);
}
#[test]
fn test_apply_and_truncate_clears_file() {
let path = test_path("truncate");
let wal = Wal::open(&path);
assert!(wal.is_ok());
let mut wal = match wal {
Ok(wal) => wal,
Err(err) => panic!("wal open should succeed: {err}"),
};
assert!(wal.append(OpRef::Clear).is_ok());
assert!(wal.flush().is_ok());
let mut seen = 0_usize;
let applied = wal.apply_and_truncate(&mut |_op| {
seen += 1;
Ok(())
});
assert!(applied.is_ok());
assert_eq!(seen, 1);
let wal_len = std::fs::metadata(&path).map(|meta| meta.len());
assert!(wal_len.is_ok());
assert_eq!(wal_len.unwrap_or(u64::MAX), 0);
let _removed = std::fs::remove_file(path);
}
#[test]
fn test_torn_tail_is_truncated_on_replay() {
let path = test_path("torn");
let wal = Wal::open(&path);
assert!(wal.is_ok());
let mut wal = match wal {
Ok(wal) => wal,
Err(err) => panic!("wal open should succeed: {err}"),
};
let mut record = Vec::new();
encode_op(
&mut record,
OpRef::Insert {
key: b"k",
value: b"v",
expires_at: None,
},
);
assert!(std::fs::write(&path, &record[..record.len() - 3]).is_ok());
let replayed = wal.replay(&mut |_op| Ok(()));
assert!(replayed.is_ok());
let wal_len = std::fs::metadata(&path).map(|meta| meta.len());
assert!(wal_len.is_ok());
assert_eq!(wal_len.unwrap_or(u64::MAX), 0);
let _removed = std::fs::remove_file(path);
}
#[test]
fn test_incomplete_batch_is_truncated_without_applying() {
let path = test_path("batch");
let wal = Wal::open(&path);
assert!(wal.is_ok());
let mut wal = match wal {
Ok(wal) => wal,
Err(err) => panic!("wal open should succeed: {err}"),
};
assert!(wal
.append(OpRef::BatchBegin {
tx_id: 7,
op_count: 1,
})
.is_ok());
assert!(wal
.append(OpRef::Insert {
key: b"k",
value: b"v",
expires_at: None,
})
.is_ok());
assert!(wal.flush().is_ok());
let mut seen = 0_usize;
let replayed = wal.replay(&mut |_op| {
seen += 1;
Ok(())
});
assert!(replayed.is_ok());
assert_eq!(seen, 0);
let wal_len = std::fs::metadata(&path).map(|meta| meta.len());
assert!(wal_len.is_ok());
assert_eq!(wal_len.unwrap_or(u64::MAX), 0);
let _removed = std::fs::remove_file(path);
}
}