use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use uuid::Uuid;
use sochdb_core::{Result, SochDBError};
const WAL_MAGIC: u64 = 0x534F43_48444257;
pub const WAL_HEADER_SIZE: usize = 64;
const WAL_VERSION: u16 = 1;
#[derive(Debug, Clone)]
pub struct WalHeader {
pub magic: u64,
pub version: u16,
pub flags: u16,
pub epoch: u64,
pub writer_id: Uuid,
pub last_commit_lsn: u64,
pub last_entry_crc: u32,
pub entry_count: u64,
pub header_crc: u32,
}
impl WalHeader {
pub fn new() -> Self {
Self {
magic: WAL_MAGIC,
version: WAL_VERSION,
flags: 0,
epoch: 1,
writer_id: Uuid::new_v4(),
last_commit_lsn: 0,
last_entry_crc: 0,
entry_count: 0,
header_crc: 0,
}
}
pub fn new_epoch(previous: &WalHeader) -> Self {
Self {
magic: WAL_MAGIC,
version: WAL_VERSION,
flags: 0,
epoch: previous.epoch + 1,
writer_id: Uuid::new_v4(),
last_commit_lsn: previous.last_commit_lsn,
last_entry_crc: previous.last_entry_crc,
entry_count: 0,
header_crc: 0,
}
}
fn compute_crc(&self) -> u32 {
let mut hasher = crc32fast::Hasher::new();
hasher.update(&self.magic.to_le_bytes());
hasher.update(&self.version.to_le_bytes());
hasher.update(&self.flags.to_le_bytes());
hasher.update(&self.epoch.to_le_bytes());
hasher.update(self.writer_id.as_bytes());
hasher.update(&self.last_commit_lsn.to_le_bytes());
hasher.update(&self.last_entry_crc.to_le_bytes());
hasher.update(&self.entry_count.to_le_bytes());
hasher.finalize()
}
pub fn read_from(file: &mut File) -> Result<Self> {
file.seek(SeekFrom::Start(0))?;
let magic = file.read_u64::<LittleEndian>()?;
if magic != WAL_MAGIC {
return Err(SochDBError::Corruption(format!(
"Invalid WAL magic: expected {:x}, got {:x}",
WAL_MAGIC, magic
)));
}
let version = file.read_u16::<LittleEndian>()?;
let flags = file.read_u16::<LittleEndian>()?;
let epoch = file.read_u64::<LittleEndian>()?;
let mut writer_id_bytes = [0u8; 16];
file.read_exact(&mut writer_id_bytes)?;
let writer_id = Uuid::from_bytes(writer_id_bytes);
let last_commit_lsn = file.read_u64::<LittleEndian>()?;
let last_entry_crc = file.read_u32::<LittleEndian>()?;
let entry_count = file.read_u64::<LittleEndian>()?;
let header_crc = file.read_u32::<LittleEndian>()?;
let header = Self {
magic,
version,
flags,
epoch,
writer_id,
last_commit_lsn,
last_entry_crc,
entry_count,
header_crc,
};
let computed_crc = header.compute_crc();
if computed_crc != header_crc {
return Err(SochDBError::Corruption(format!(
"WAL header CRC mismatch: expected {:x}, got {:x}",
computed_crc, header_crc
)));
}
Ok(header)
}
pub fn write_to(&self, file: &mut File) -> Result<()> {
file.seek(SeekFrom::Start(0))?;
file.write_u64::<LittleEndian>(self.magic)?;
file.write_u16::<LittleEndian>(self.version)?;
file.write_u16::<LittleEndian>(self.flags)?;
file.write_u64::<LittleEndian>(self.epoch)?;
file.write_all(self.writer_id.as_bytes())?;
file.write_u64::<LittleEndian>(self.last_commit_lsn)?;
file.write_u32::<LittleEndian>(self.last_entry_crc)?;
file.write_u64::<LittleEndian>(self.entry_count)?;
let crc = self.compute_crc();
file.write_u32::<LittleEndian>(crc)?;
let written = 8 + 2 + 2 + 8 + 16 + 8 + 4 + 8 + 4; let padding = WAL_HEADER_SIZE - written;
file.write_all(&vec![0u8; padding])?;
file.sync_all()?;
Ok(())
}
pub fn update_last_entry_crc(&mut self, crc: u32) {
self.last_entry_crc = crc;
self.entry_count += 1;
}
pub fn update_last_commit(&mut self, lsn: u64) {
self.last_commit_lsn = lsn;
}
}
impl Default for WalHeader {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct FencedWalEntry {
pub lsn: u64,
pub prev_crc: u32,
pub epoch: u64,
pub payload: Vec<u8>,
pub crc: u32,
}
impl FencedWalEntry {
const HEADER_SIZE: usize = 8 + 4 + 8 + 4; const FOOTER_SIZE: usize = 4;
pub fn new(lsn: u64, prev_crc: u32, epoch: u64, payload: Vec<u8>) -> Self {
let mut entry = Self {
lsn,
prev_crc,
epoch,
payload,
crc: 0,
};
entry.crc = entry.compute_crc();
entry
}
fn compute_crc(&self) -> u32 {
let mut hasher = crc32fast::Hasher::new();
hasher.update(&self.lsn.to_le_bytes());
hasher.update(&self.prev_crc.to_le_bytes());
hasher.update(&self.epoch.to_le_bytes());
hasher.update(&(self.payload.len() as u32).to_le_bytes());
hasher.update(&self.payload);
hasher.finalize()
}
pub fn to_bytes(&self) -> Vec<u8> {
let total_len = Self::HEADER_SIZE + self.payload.len() + Self::FOOTER_SIZE;
let mut buf = Vec::with_capacity(total_len);
buf.extend_from_slice(&self.lsn.to_le_bytes());
buf.extend_from_slice(&self.prev_crc.to_le_bytes());
buf.extend_from_slice(&self.epoch.to_le_bytes());
buf.extend_from_slice(&(self.payload.len() as u32).to_le_bytes());
buf.extend_from_slice(&self.payload);
buf.extend_from_slice(&self.crc.to_le_bytes());
buf
}
pub fn read_from<R: Read>(reader: &mut R) -> Result<Self> {
let lsn = reader.read_u64::<LittleEndian>()?;
let prev_crc = reader.read_u32::<LittleEndian>()?;
let epoch = reader.read_u64::<LittleEndian>()?;
let payload_len = reader.read_u32::<LittleEndian>()? as usize;
let mut payload = vec![0u8; payload_len];
reader.read_exact(&mut payload)?;
let crc = reader.read_u32::<LittleEndian>()?;
let entry = Self {
lsn,
prev_crc,
epoch,
payload,
crc,
};
let computed_crc = entry.compute_crc();
if computed_crc != crc {
return Err(SochDBError::Corruption(format!(
"WAL entry CRC mismatch at LSN {}: expected {:x}, got {:x}",
lsn, computed_crc, crc
)));
}
Ok(entry)
}
pub fn size(&self) -> usize {
Self::HEADER_SIZE + self.payload.len() + Self::FOOTER_SIZE
}
}
pub struct FencedWal {
path: PathBuf,
header: WalHeader,
file: File,
write_pos: u64,
}
impl FencedWal {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref().to_path_buf();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let file_exists = path.exists();
let mut file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.open(&path)?;
let (header, write_pos) = if file_exists && file.metadata()?.len() >= WAL_HEADER_SIZE as u64
{
let existing_header = WalHeader::read_from(&mut file)?;
let new_header = WalHeader::new_epoch(&existing_header);
new_header.write_to(&mut file)?;
let write_pos = Self::find_write_position(&mut file, &existing_header)?;
(new_header, write_pos)
} else {
let header = WalHeader::new();
header.write_to(&mut file)?;
(header, WAL_HEADER_SIZE as u64)
};
Ok(Self {
path,
header,
file,
write_pos,
})
}
fn find_write_position(file: &mut File, header: &WalHeader) -> Result<u64> {
file.seek(SeekFrom::Start(WAL_HEADER_SIZE as u64))?;
let mut pos = WAL_HEADER_SIZE as u64;
let mut prev_crc = 0u32;
let mut entries_verified = 0u64;
loop {
match FencedWalEntry::read_from(file) {
Ok(entry) => {
if entries_verified > 0 && entry.prev_crc != prev_crc {
eprintln!(
"WAL chain broken at LSN {}: expected prev_crc {:x}, got {:x}",
entry.lsn, prev_crc, entry.prev_crc
);
break;
}
if entry.epoch > header.epoch {
return Err(SochDBError::SplitBrain(format!(
"Entry has future epoch {} > header epoch {}",
entry.epoch, header.epoch
)));
}
prev_crc = entry.crc;
pos += entry.size() as u64;
entries_verified += 1;
}
Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
break;
}
Err(SochDBError::Corruption(_)) => {
break;
}
Err(e) => return Err(e),
}
}
file.set_len(pos)?;
file.seek(SeekFrom::Start(pos))?;
Ok(pos)
}
pub fn append(&mut self, payload: Vec<u8>) -> Result<u64> {
let lsn = self.header.entry_count + 1;
let entry =
FencedWalEntry::new(lsn, self.header.last_entry_crc, self.header.epoch, payload);
let bytes = entry.to_bytes();
self.file.seek(SeekFrom::Start(self.write_pos))?;
self.file.write_all(&bytes)?;
self.write_pos += bytes.len() as u64;
self.header.update_last_entry_crc(entry.crc);
Ok(lsn)
}
pub fn sync(&mut self) -> Result<()> {
self.file.sync_all()?;
self.header.write_to(&mut self.file)?;
Ok(())
}
pub fn commit(&mut self, lsn: u64) -> Result<()> {
self.header.update_last_commit(lsn);
self.sync()
}
pub fn epoch(&self) -> u64 {
self.header.epoch
}
pub fn writer_id(&self) -> Uuid {
self.header.writer_id
}
pub fn last_commit_lsn(&self) -> u64 {
self.header.last_commit_lsn
}
pub fn entry_count(&self) -> u64 {
self.header.entry_count
}
pub fn replay<F>(&mut self, mut callback: F) -> Result<u64>
where
F: FnMut(&FencedWalEntry) -> Result<()>,
{
self.file.seek(SeekFrom::Start(WAL_HEADER_SIZE as u64))?;
let mut prev_crc = 0u32;
let mut count = 0u64;
loop {
match FencedWalEntry::read_from(&mut self.file) {
Ok(entry) => {
if count > 0 && entry.prev_crc != prev_crc {
return Err(SochDBError::Corruption(format!(
"Chain broken at LSN {}: expected {:x}, got {:x}",
entry.lsn, prev_crc, entry.prev_crc
)));
}
callback(&entry)?;
prev_crc = entry.crc;
count += 1;
}
Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
break;
}
Err(e) => return Err(e),
}
}
Ok(count)
}
pub fn replay_committed<F>(&mut self, callback: F) -> Result<u64>
where
F: FnMut(&FencedWalEntry) -> Result<()>,
{
let commit_lsn = self.header.last_commit_lsn;
let mut wrapped_callback = callback;
let mut committed_count = 0u64;
self.replay(|entry| {
if entry.lsn <= commit_lsn {
wrapped_callback(entry)?;
committed_count += 1;
}
Ok(())
})?;
Ok(committed_count)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_header_roundtrip() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.wal");
let header = WalHeader::new();
{
let mut file = File::create(&path).unwrap();
header.write_to(&mut file).unwrap();
}
{
let mut file = File::open(&path).unwrap();
let read_header = WalHeader::read_from(&mut file).unwrap();
assert_eq!(read_header.magic, header.magic);
assert_eq!(read_header.epoch, header.epoch);
assert_eq!(read_header.writer_id, header.writer_id);
}
}
#[test]
fn test_epoch_increment() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.wal");
let wal1 = FencedWal::open(&path).unwrap();
let epoch1 = wal1.epoch();
let writer1 = wal1.writer_id();
drop(wal1);
let wal2 = FencedWal::open(&path).unwrap();
let epoch2 = wal2.epoch();
let writer2 = wal2.writer_id();
assert_eq!(epoch2, epoch1 + 1);
assert_ne!(writer1, writer2);
}
#[test]
fn test_entry_chain() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.wal");
let mut wal = FencedWal::open(&path).unwrap();
wal.append(b"entry1".to_vec()).unwrap();
wal.append(b"entry2".to_vec()).unwrap();
wal.append(b"entry3".to_vec()).unwrap();
wal.sync().unwrap();
let mut entries = Vec::new();
wal.replay(|entry| {
entries.push(entry.payload.clone());
Ok(())
})
.unwrap();
assert_eq!(entries.len(), 3);
assert_eq!(entries[0], b"entry1");
assert_eq!(entries[1], b"entry2");
assert_eq!(entries[2], b"entry3");
}
#[test]
fn test_commit_replay() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.wal");
{
let mut wal = FencedWal::open(&path).unwrap();
wal.append(b"committed1".to_vec()).unwrap();
wal.append(b"committed2".to_vec()).unwrap();
wal.commit(2).unwrap();
wal.append(b"uncommitted".to_vec()).unwrap();
wal.sync().unwrap();
}
let mut wal = FencedWal::open(&path).unwrap();
let mut entries = Vec::new();
wal.replay_committed(|entry| {
entries.push(entry.payload.clone());
Ok(())
})
.unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0], b"committed1");
assert_eq!(entries[1], b"committed2");
}
}