use std::{
fs::{File, OpenOptions},
io::{self, BufWriter, Read, Seek, SeekFrom, Write},
path::PathBuf,
};
use bincode::{config::standard, decode_from_slice, encode_to_vec};
use blake3::Hasher;
use bincode::{Decode, Encode};
use crate::infinitedb_core::{
address::{Address, RevisionId, SpaceId},
block::BlockId,
snapshot::SnapshotId,
};
#[derive(Debug, Clone, Encode, Decode)]
pub enum WalEntry {
Write {
address: Address,
revision: RevisionId,
data: Vec<u8>,
},
Tombstone {
address: Address,
revision: RevisionId,
},
BlockSealed {
block_id: BlockId,
space: SpaceId,
snapshot: SnapshotId,
},
Checkpoint { revision: RevisionId },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WalDurability {
Strict,
Buffered { sync_every: usize },
}
impl WalDurability {
pub fn sync_every(self) -> usize {
match self {
WalDurability::Strict => 1,
WalDurability::Buffered { sync_every } => sync_every.max(1),
}
}
}
pub struct WalWriter {
writer: BufWriter<File>,
path: PathBuf,
durability: WalDurability,
pending_since_sync: usize,
}
impl WalWriter {
pub fn open(path: PathBuf) -> io::Result<Self> {
Self::open_with_durability(path, WalDurability::Strict)
}
pub fn open_with_durability(path: PathBuf, durability: WalDurability) -> io::Result<Self> {
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&path)?;
Ok(Self {
writer: BufWriter::new(file),
path,
durability,
pending_since_sync: 0,
})
}
pub fn set_durability(&mut self, durability: WalDurability) {
self.durability = durability;
}
pub fn durability(&self) -> WalDurability {
self.durability
}
pub fn pending_frames(&self) -> usize {
self.pending_since_sync
}
pub fn append_frame(&mut self, entry: &WalEntry) -> io::Result<()> {
let payload = encode_to_vec(entry, standard())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let len = payload.len() as u64;
let checksum = blake3_hash(&payload);
self.writer.write_all(&len.to_le_bytes())?;
self.writer.write_all(&payload)?;
self.writer.write_all(&checksum)?;
self.pending_since_sync += 1;
Ok(())
}
pub fn sync(&mut self) -> io::Result<()> {
self.writer.flush()?;
self.writer.get_ref().sync_all()?;
self.pending_since_sync = 0;
Ok(())
}
pub fn append(&mut self, entry: &WalEntry) -> io::Result<()> {
self.append_frame(entry)?;
let every = self.durability.sync_every();
if self.pending_since_sync >= every {
self.sync()?;
}
Ok(())
}
pub fn rewrite(&mut self, entries: &[WalEntry]) -> io::Result<()> {
let truncated = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&self.path)?;
truncated.sync_all()?;
drop(truncated);
let file = OpenOptions::new().create(true).append(true).open(&self.path)?;
self.writer = BufWriter::new(file);
self.pending_since_sync = 0;
for entry in entries {
self.append_frame(entry)?;
}
self.sync()
}
pub fn path(&self) -> &PathBuf {
&self.path
}
}
pub struct WalReader {
file: File,
}
impl WalReader {
pub fn open(path: PathBuf) -> io::Result<Self> {
let file = File::open(path)?;
Ok(Self { file })
}
pub fn entries(&mut self) -> io::Result<Vec<WalEntry>> {
self.file.seek(SeekFrom::Start(0))?;
let mut out = Vec::new();
let mut len_buf = [0u8; 8];
loop {
match self.file.read_exact(&mut len_buf) {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e),
}
let len = u64::from_le_bytes(len_buf) as usize;
let mut payload = vec![0u8; len];
let mut checksum_buf = [0u8; 32];
self.file.read_exact(&mut payload)?;
self.file.read_exact(&mut checksum_buf)?;
if blake3_hash(&payload) != checksum_buf {
break;
}
match decode_from_slice::<WalEntry, _>(&payload, standard()) {
Ok((entry, _)) => out.push(entry),
Err(_) => break,
}
}
Ok(out)
}
}
fn blake3_hash(data: &[u8]) -> [u8; 32] {
let mut h = Hasher::new();
h.update(data);
*h.finalize().as_bytes()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::infinitedb_core::address::{DimensionVector, SpaceId};
use tempfile::NamedTempFile;
fn sample_entry() -> WalEntry {
WalEntry::Write {
address: Address::new(
SpaceId(1),
DimensionVector::new(vec![10, 20]),
),
revision: RevisionId::legacy(1),
data: vec![1, 2, 3],
}
}
#[test]
fn roundtrip_single_entry() {
let tmp = NamedTempFile::new().unwrap();
let path = tmp.path().to_path_buf();
let mut writer = WalWriter::open(path.clone()).unwrap();
writer.append(&sample_entry()).unwrap();
drop(writer);
let mut reader = WalReader::open(path).unwrap();
let entries = reader.entries().unwrap();
assert_eq!(entries.len(), 1);
assert!(matches!(entries[0], WalEntry::Write { .. }));
}
#[test]
fn multiple_entries_roundtrip() {
let tmp = NamedTempFile::new().unwrap();
let path = tmp.path().to_path_buf();
let mut writer = WalWriter::open(path.clone()).unwrap();
writer.append(&sample_entry()).unwrap();
writer.append(&WalEntry::Checkpoint { revision: RevisionId::legacy(1) }).unwrap();
drop(writer);
let mut reader = WalReader::open(path).unwrap();
let entries = reader.entries().unwrap();
assert_eq!(entries.len(), 2);
}
#[test]
fn buffered_syncs_periodically() {
let tmp = NamedTempFile::new().unwrap();
let path = tmp.path().to_path_buf();
let mut writer = WalWriter::open_with_durability(
path.clone(),
WalDurability::Buffered { sync_every: 4 },
)
.unwrap();
for _ in 0..3 {
writer.append_frame(&sample_entry()).unwrap();
}
writer.sync().unwrap();
writer.append_frame(&sample_entry()).unwrap();
writer.sync().unwrap();
drop(writer);
let mut reader = WalReader::open(path).unwrap();
assert_eq!(reader.entries().unwrap().len(), 4);
}
#[test]
fn rewrite_uses_single_sync() {
let tmp = NamedTempFile::new().unwrap();
let path = tmp.path().to_path_buf();
let mut writer = WalWriter::open(path.clone()).unwrap();
writer.append(&sample_entry()).unwrap();
let entries = vec![
sample_entry(),
WalEntry::Checkpoint { revision: RevisionId::legacy(2) },
];
writer.rewrite(&entries).unwrap();
drop(writer);
let mut reader = WalReader::open(path).unwrap();
assert_eq!(reader.entries().unwrap().len(), 2);
}
}