use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use rkyv::{
api::high, rancor::Error as RkyvError, util::AlignedVec, Archive,
Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
};
mod wal_types {
#![allow(missing_docs)]
use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
#[allow(clippy::disallowed_types)] use std::collections::HashMap;
#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
pub enum WalEntry {
Put {
key: Vec<u8>,
value: Vec<u8>,
},
Delete {
key: Vec<u8>,
},
Checkpoint {
id: u64,
},
Commit {
offsets: HashMap<String, u64>,
watermark: Option<i64>,
},
}
}
pub use wal_types::WalEntry;
const RECORD_HEADER_SIZE: u64 = 8;
const MAX_WAL_ENTRY_SIZE: u64 = 256 * 1024 * 1024;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Archive, RkyvSerialize, RkyvDeserialize)]
#[rkyv(compare(PartialEq))]
pub struct WalPosition {
pub offset: u64,
}
pub struct WriteAheadLog {
writer: BufWriter<File>,
path: PathBuf,
sync_interval: Duration,
last_sync: Instant,
position: u64,
sync_on_write: bool,
write_buffer: Vec<u8>,
serialize_buffer: AlignedVec,
}
#[derive(Debug, thiserror::Error)]
pub enum WalError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Deserialization error: {0}")]
Deserialization(String),
#[error("Corrupted WAL entry at position {position}: {reason}")]
Corrupted {
position: u64,
reason: String,
},
#[error("CRC32 checksum mismatch at position {position}: expected {expected:#010x}, got {actual:#010x}")]
ChecksumMismatch {
position: u64,
expected: u32,
actual: u32,
},
#[error("Torn write detected at position {position}: {reason}")]
TornWrite {
position: u64,
reason: String,
},
}
impl WriteAheadLog {
pub fn new<P: AsRef<Path>>(path: P, sync_interval: Duration) -> Result<Self, WalError> {
let path = path.as_ref().to_path_buf();
let file = OpenOptions::new().create(true).append(true).open(&path)?;
let position = file.metadata()?.len();
Ok(Self {
writer: BufWriter::new(file),
path,
sync_interval,
last_sync: Instant::now(),
position,
sync_on_write: false,
write_buffer: Vec::with_capacity(4096),
serialize_buffer: AlignedVec::with_capacity(256),
})
}
pub fn set_sync_on_write(&mut self, enabled: bool) {
self.sync_on_write = enabled;
}
pub fn append(&mut self, entry: &WalEntry) -> Result<u64, WalError> {
let start_pos = self.position;
self.serialize_buffer.clear();
let taken = std::mem::take(&mut self.serialize_buffer);
let bytes = high::to_bytes_in::<_, RkyvError>(entry, taken)
.map_err(|e| WalError::Serialization(e.to_string()))?;
let crc = crc32c::crc32c(&bytes);
if bytes.len() > u32::MAX as usize {
return Err(WalError::Serialization(format!(
"Entry too large: {} bytes (max {})",
bytes.len(),
u32::MAX
)));
}
#[allow(clippy::cast_possible_truncation)] let len = bytes.len() as u32;
self.write_buffer.clear();
#[allow(clippy::cast_possible_truncation)] self.write_buffer
.reserve(RECORD_HEADER_SIZE as usize + bytes.len());
self.write_buffer.extend_from_slice(&len.to_le_bytes());
self.write_buffer.extend_from_slice(&crc.to_le_bytes());
self.write_buffer.extend_from_slice(&bytes);
self.writer.write_all(&self.write_buffer)?;
let bytes_len = bytes.len() as u64;
self.position += RECORD_HEADER_SIZE + bytes_len;
self.serialize_buffer = bytes;
if self.sync_on_write || self.last_sync.elapsed() >= self.sync_interval {
self.sync()?;
}
Ok(start_pos)
}
pub fn sync(&mut self) -> Result<(), WalError> {
self.writer.flush()?;
self.writer.get_ref().sync_data()?;
self.last_sync = Instant::now();
Ok(())
}
pub fn read_from(&self, position: u64) -> Result<WalReader, WalError> {
let file = File::open(&self.path)?;
let file_len = file.metadata()?.len();
let mut reader = BufReader::new(file);
reader.seek(SeekFrom::Start(position))?;
Ok(WalReader {
reader,
position,
file_len,
})
}
#[must_use]
pub fn position(&self) -> u64 {
self.position
}
#[must_use]
pub fn path(&self) -> &Path {
&self.path
}
pub fn truncate(&mut self, position: u64) -> Result<(), WalError> {
self.sync()?;
let file = OpenOptions::new()
.write(true)
.truncate(false)
.open(&self.path)?;
file.set_len(position)?;
file.sync_all()?;
let file = OpenOptions::new().append(true).open(&self.path)?;
self.writer = BufWriter::new(file);
self.position = position;
Ok(())
}
pub fn repair(&mut self) -> Result<u64, WalError> {
self.sync()?;
let file = File::open(&self.path)?;
let file_len = file.metadata()?.len();
let mut reader = BufReader::new(file);
let mut valid_position: u64 = 0;
let mut current_position: u64 = 0;
loop {
match Self::validate_record(&mut reader, current_position, file_len) {
Ok(record_len) => {
current_position += record_len;
valid_position = current_position;
}
Err(WalError::TornWrite { .. }) => {
break;
}
Err(WalError::ChecksumMismatch { .. }) => {
break;
}
Err(WalError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
break;
}
Err(e) => return Err(e),
}
}
if valid_position < file_len {
self.truncate(valid_position)?;
}
Ok(valid_position)
}
fn validate_record(
reader: &mut BufReader<File>,
position: u64,
file_len: u64,
) -> Result<u64, WalError> {
let remaining = file_len.saturating_sub(position);
if remaining < RECORD_HEADER_SIZE {
if remaining == 0 {
return Err(WalError::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"end of file",
)));
}
return Err(WalError::TornWrite {
position,
reason: format!(
"incomplete header: only {remaining} bytes remaining, need {RECORD_HEADER_SIZE}"
),
});
}
let mut len_bytes = [0u8; 4];
reader.read_exact(&mut len_bytes)?;
let len = u64::from(u32::from_le_bytes(len_bytes));
let mut crc_bytes = [0u8; 4];
reader.read_exact(&mut crc_bytes)?;
let expected_crc = u32::from_le_bytes(crc_bytes);
if len > MAX_WAL_ENTRY_SIZE {
return Err(WalError::Corrupted {
position,
reason: format!(
"[LDB-6006] WAL entry length {len} exceeds maximum \
{MAX_WAL_ENTRY_SIZE} bytes — likely corrupted"
),
});
}
let data_remaining = remaining - RECORD_HEADER_SIZE;
if data_remaining < len {
return Err(WalError::TornWrite {
position,
reason: format!(
"incomplete data: only {data_remaining} bytes remaining, need {len}"
),
});
}
#[allow(clippy::cast_possible_truncation)] let mut data = vec![0u8; len as usize];
reader.read_exact(&mut data)?;
let actual_crc = crc32c::crc32c(&data);
if actual_crc != expected_crc {
return Err(WalError::ChecksumMismatch {
position,
expected: expected_crc,
actual: actual_crc,
});
}
Ok(RECORD_HEADER_SIZE + len)
}
}
pub struct WalReader {
reader: BufReader<File>,
position: u64,
file_len: u64,
}
impl WalReader {
#[must_use]
pub fn position(&self) -> u64 {
self.position
}
}
#[derive(Debug)]
pub enum WalReadResult {
Entry(WalEntry),
Eof,
TornWrite {
position: u64,
reason: String,
},
ChecksumMismatch {
position: u64,
expected: u32,
actual: u32,
},
Corrupted {
position: u64,
reason: String,
},
}
impl WalReader {
pub fn read_next(&mut self) -> Result<WalReadResult, WalError> {
let remaining = self.file_len.saturating_sub(self.position);
if remaining == 0 {
return Ok(WalReadResult::Eof);
}
if remaining < RECORD_HEADER_SIZE {
return Ok(WalReadResult::TornWrite {
position: self.position,
reason: format!(
"incomplete header: only {remaining} bytes remaining, need {RECORD_HEADER_SIZE}"
),
});
}
let record_start = self.position;
let mut len_bytes = [0u8; 4];
self.reader.read_exact(&mut len_bytes)?;
let len = u64::from(u32::from_le_bytes(len_bytes));
self.position += 4;
let mut crc_bytes = [0u8; 4];
self.reader.read_exact(&mut crc_bytes)?;
let expected_crc = u32::from_le_bytes(crc_bytes);
self.position += 4;
if len > MAX_WAL_ENTRY_SIZE {
return Ok(WalReadResult::Corrupted {
position: record_start,
reason: format!(
"[LDB-6006] WAL entry length {len} exceeds maximum \
{MAX_WAL_ENTRY_SIZE} bytes — likely corrupted"
),
});
}
let data_remaining = self.file_len.saturating_sub(self.position);
if data_remaining < len {
return Ok(WalReadResult::TornWrite {
position: record_start,
reason: format!(
"incomplete data: only {data_remaining} bytes remaining, need {len}"
),
});
}
#[allow(clippy::cast_possible_truncation)] let mut data = vec![0u8; len as usize];
self.reader.read_exact(&mut data)?;
self.position += len;
let actual_crc = crc32c::crc32c(&data);
if actual_crc != expected_crc {
return Ok(WalReadResult::ChecksumMismatch {
position: record_start,
expected: expected_crc,
actual: actual_crc,
});
}
match rkyv::from_bytes::<WalEntry, RkyvError>(&data) {
Ok(entry) => Ok(WalReadResult::Entry(entry)),
Err(e) => Err(WalError::Deserialization(e.to_string())),
}
}
}
impl Iterator for WalReader {
type Item = Result<WalEntry, WalError>;
fn next(&mut self) -> Option<Self::Item> {
match self.read_next() {
Ok(WalReadResult::Entry(entry)) => Some(Ok(entry)),
Ok(WalReadResult::Eof) => None,
Ok(WalReadResult::TornWrite { position, reason }) => {
Some(Err(WalError::TornWrite { position, reason }))
}
Ok(WalReadResult::ChecksumMismatch {
position,
expected,
actual,
}) => Some(Err(WalError::ChecksumMismatch {
position,
expected,
actual,
})),
Ok(WalReadResult::Corrupted { position, reason }) => {
Some(Err(WalError::Corrupted { position, reason }))
}
Err(e) => Some(Err(e)),
}
}
}
#[cfg(test)]
mod tests {
#[allow(clippy::disallowed_types)] use std::collections::HashMap;
use super::*;
use tempfile::NamedTempFile;
#[test]
fn test_wal_append_and_read() {
let temp_file = NamedTempFile::new().unwrap();
let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
let pos1 = wal
.append(&WalEntry::Put {
key: b"key1".to_vec(),
value: b"value1".to_vec(),
})
.unwrap();
let _pos2 = wal
.append(&WalEntry::Delete {
key: b"key2".to_vec(),
})
.unwrap();
wal.sync().unwrap();
let reader = wal.read_from(pos1).unwrap();
let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), 2);
match &entries[0] {
WalEntry::Put { key, value } => {
assert_eq!(key, b"key1");
assert_eq!(value, b"value1");
}
_ => panic!("Expected Put entry"),
}
match &entries[1] {
WalEntry::Delete { key } => {
assert_eq!(key, b"key2");
}
_ => panic!("Expected Delete entry"),
}
}
#[test]
fn test_wal_checkpoint() {
let temp_file = NamedTempFile::new().unwrap();
let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
wal.append(&WalEntry::Put {
key: b"key1".to_vec(),
value: b"value1".to_vec(),
})
.unwrap();
let checkpoint_pos = wal.append(&WalEntry::Checkpoint { id: 1 }).unwrap();
wal.append(&WalEntry::Put {
key: b"key2".to_vec(),
value: b"value2".to_vec(),
})
.unwrap();
wal.sync().unwrap();
let reader = wal.read_from(checkpoint_pos).unwrap();
let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), 2);
match &entries[0] {
WalEntry::Checkpoint { id } => {
assert_eq!(*id, 1);
}
_ => panic!("Expected Checkpoint entry"),
}
}
#[test]
fn test_wal_truncate() {
let temp_file = NamedTempFile::new().unwrap();
let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
wal.append(&WalEntry::Put {
key: b"key1".to_vec(),
value: b"value1".to_vec(),
})
.unwrap();
let truncate_pos = wal.position();
wal.append(&WalEntry::Put {
key: b"key2".to_vec(),
value: b"value2".to_vec(),
})
.unwrap();
wal.sync().unwrap();
wal.truncate(truncate_pos).unwrap();
let reader = wal.read_from(0).unwrap();
let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), 1); }
#[test]
fn test_wal_commit_offsets() {
let temp_file = NamedTempFile::new().unwrap();
let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
let mut offsets = HashMap::new();
offsets.insert("topic1".to_string(), 100);
offsets.insert("topic2".to_string(), 200);
wal.append(&WalEntry::Commit {
offsets: offsets.clone(),
watermark: Some(1000),
})
.unwrap();
wal.sync().unwrap();
let reader = wal.read_from(0).unwrap();
let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), 1);
match &entries[0] {
WalEntry::Commit {
offsets: read_offsets,
watermark,
} => {
assert_eq!(read_offsets.get("topic1"), Some(&100));
assert_eq!(read_offsets.get("topic2"), Some(&200));
assert_eq!(*watermark, Some(1000));
}
_ => panic!("Expected Commit entry"),
}
}
#[test]
fn test_wal_crc32_validation() {
let temp_file = NamedTempFile::new().unwrap();
let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
wal.append(&WalEntry::Put {
key: b"key1".to_vec(),
value: b"value1".to_vec(),
})
.unwrap();
wal.sync().unwrap();
{
use std::io::Write;
let mut file = OpenOptions::new()
.write(true)
.open(temp_file.path())
.unwrap();
file.seek(SeekFrom::Start(10)).unwrap();
file.write_all(&[0xFF]).unwrap();
file.sync_all().unwrap();
}
let mut reader = wal.read_from(0).unwrap();
match reader.read_next().unwrap() {
WalReadResult::ChecksumMismatch {
position,
expected,
actual,
} => {
assert_eq!(position, 0);
assert_ne!(expected, actual);
}
other => panic!("Expected ChecksumMismatch, got {other:?}"),
}
}
#[test]
fn test_wal_torn_write_detection_incomplete_header() {
let temp_file = NamedTempFile::new().unwrap();
let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
wal.append(&WalEntry::Put {
key: b"key1".to_vec(),
value: b"value1".to_vec(),
})
.unwrap();
wal.sync().unwrap();
let valid_pos = wal.position();
{
use std::io::Write;
let mut file = OpenOptions::new()
.append(true)
.open(temp_file.path())
.unwrap();
file.write_all(&[0x10, 0x00, 0x00]).unwrap(); file.sync_all().unwrap();
}
let mut reader = wal.read_from(0).unwrap();
match reader.read_next().unwrap() {
WalReadResult::Entry(WalEntry::Put { key, .. }) => {
assert_eq!(key, b"key1");
}
other => panic!("Expected valid entry, got {other:?}"),
}
match reader.read_next().unwrap() {
WalReadResult::TornWrite { position, .. } => {
assert_eq!(position, valid_pos);
}
other => panic!("Expected TornWrite, got {other:?}"),
}
}
#[test]
fn test_wal_torn_write_detection_incomplete_data() {
let temp_file = NamedTempFile::new().unwrap();
let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
wal.append(&WalEntry::Put {
key: b"key1".to_vec(),
value: b"value1".to_vec(),
})
.unwrap();
wal.sync().unwrap();
let valid_pos = wal.position();
{
use std::io::Write;
let mut file = OpenOptions::new()
.append(true)
.open(temp_file.path())
.unwrap();
let len: u32 = 100;
let crc: u32 = 0x1234_5678;
file.write_all(&len.to_le_bytes()).unwrap();
file.write_all(&crc.to_le_bytes()).unwrap();
file.write_all(&[0u8; 10]).unwrap(); file.sync_all().unwrap();
}
let mut reader = wal.read_from(0).unwrap();
match reader.read_next().unwrap() {
WalReadResult::Entry(WalEntry::Put { key, .. }) => {
assert_eq!(key, b"key1");
}
other => panic!("Expected valid entry, got {other:?}"),
}
match reader.read_next().unwrap() {
WalReadResult::TornWrite { position, reason } => {
assert_eq!(position, valid_pos);
assert!(reason.contains("incomplete data"));
}
other => panic!("Expected TornWrite, got {other:?}"),
}
}
#[test]
fn test_wal_repair() {
let temp_file = NamedTempFile::new().unwrap();
let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
wal.append(&WalEntry::Put {
key: b"key1".to_vec(),
value: b"value1".to_vec(),
})
.unwrap();
wal.append(&WalEntry::Put {
key: b"key2".to_vec(),
value: b"value2".to_vec(),
})
.unwrap();
wal.sync().unwrap();
let valid_len = wal.position();
{
use std::io::Write;
let mut file = OpenOptions::new()
.append(true)
.open(temp_file.path())
.unwrap();
file.write_all(&[0xFF, 0xFF, 0xFF]).unwrap();
file.sync_all().unwrap();
}
let repaired_len = wal.repair().unwrap();
assert_eq!(repaired_len, valid_len);
let reader = wal.read_from(0).unwrap();
let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), 2);
}
#[test]
fn test_wal_repair_with_crc_corruption() {
let temp_file = NamedTempFile::new().unwrap();
let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
wal.append(&WalEntry::Put {
key: b"key1".to_vec(),
value: b"value1".to_vec(),
})
.unwrap();
wal.sync().unwrap();
let first_entry_end = wal.position();
wal.append(&WalEntry::Put {
key: b"key2".to_vec(),
value: b"value2".to_vec(),
})
.unwrap();
wal.sync().unwrap();
{
use std::io::Write;
let mut file = OpenOptions::new()
.write(true)
.open(temp_file.path())
.unwrap();
file.seek(SeekFrom::Start(first_entry_end + 4)).unwrap();
file.write_all(&[0xFF, 0xFF, 0xFF, 0xFF]).unwrap();
file.sync_all().unwrap();
}
let repaired_len = wal.repair().unwrap();
assert_eq!(repaired_len, first_entry_end);
let reader = wal.read_from(0).unwrap();
let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), 1);
match &entries[0] {
WalEntry::Put { key, value } => {
assert_eq!(key, b"key1");
assert_eq!(value, b"value1");
}
_ => panic!("Expected Put entry"),
}
}
#[test]
fn test_wal_read_next_vs_iterator() {
let temp_file = NamedTempFile::new().unwrap();
let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
wal.append(&WalEntry::Put {
key: b"key1".to_vec(),
value: b"value1".to_vec(),
})
.unwrap();
wal.sync().unwrap();
let mut reader1 = wal.read_from(0).unwrap();
match reader1.read_next().unwrap() {
WalReadResult::Entry(WalEntry::Put { key, .. }) => {
assert_eq!(key, b"key1");
}
other => panic!("Expected Entry, got {other:?}"),
}
match reader1.read_next().unwrap() {
WalReadResult::Eof => {}
other => panic!("Expected Eof, got {other:?}"),
}
let reader2 = wal.read_from(0).unwrap();
let entries: Vec<_> = reader2.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), 1);
}
#[test]
fn test_wal_empty_file() {
let temp_file = NamedTempFile::new().unwrap();
let wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
let mut reader = wal.read_from(0).unwrap();
match reader.read_next().unwrap() {
WalReadResult::Eof => {}
other => panic!("Expected Eof, got {other:?}"),
}
}
#[test]
fn test_wal_watermark_in_commit() {
let temp_file = NamedTempFile::new().unwrap();
let mut wal = WriteAheadLog::new(temp_file.path(), Duration::from_secs(1)).unwrap();
wal.append(&WalEntry::Commit {
offsets: HashMap::new(),
watermark: None,
})
.unwrap();
wal.append(&WalEntry::Commit {
offsets: HashMap::new(),
watermark: Some(12345),
})
.unwrap();
wal.sync().unwrap();
let reader = wal.read_from(0).unwrap();
let entries: Vec<_> = reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), 2);
match &entries[0] {
WalEntry::Commit { watermark, .. } => assert_eq!(*watermark, None),
_ => panic!("Expected Commit"),
}
match &entries[1] {
WalEntry::Commit { watermark, .. } => assert_eq!(*watermark, Some(12345)),
_ => panic!("Expected Commit"),
}
}
}