use std::{
fs::{File, OpenOptions},
io::{self, BufWriter, Read, Seek, SeekFrom, Write},
path::PathBuf,
};
use bincode::{config::standard, decode_from_slice, encode_to_vec};
use blake3::Hasher;
use bincode::{Decode, Encode};
use crate::infinitedb_core::{
address::{Address, RevisionId, SpaceId},
block::BlockId,
snapshot::SnapshotId,
};
#[derive(Debug, Clone, Encode, Decode)]
pub enum WalEntry {
Write {
address: Address,
revision: RevisionId,
data: Vec<u8>,
},
Tombstone {
address: Address,
revision: RevisionId,
},
BlockSealed {
block_id: BlockId,
space: SpaceId,
snapshot: SnapshotId,
},
Checkpoint { revision: RevisionId },
}
pub struct WalWriter {
writer: BufWriter<File>,
path: PathBuf,
}
impl WalWriter {
pub fn open(path: PathBuf) -> io::Result<Self> {
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&path)?;
Ok(Self {
writer: BufWriter::new(file),
path,
})
}
pub fn append(&mut self, entry: &WalEntry) -> io::Result<()> {
let payload = encode_to_vec(entry, standard())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let len = payload.len() as u64;
let checksum = blake3_hash(&payload);
self.writer.write_all(&len.to_le_bytes())?;
self.writer.write_all(&payload)?;
self.writer.write_all(&checksum)?;
self.writer.flush()?;
self.writer.get_ref().sync_all()
}
pub fn rewrite(&mut self, entries: &[WalEntry]) -> io::Result<()> {
let truncated = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&self.path)?;
truncated.sync_all()?;
drop(truncated);
let file = OpenOptions::new().create(true).append(true).open(&self.path)?;
self.writer = BufWriter::new(file);
for entry in entries {
self.append(entry)?;
}
Ok(())
}
pub fn path(&self) -> &PathBuf {
&self.path
}
}
pub struct WalReader {
file: File,
}
impl WalReader {
pub fn open(path: PathBuf) -> io::Result<Self> {
let file = File::open(path)?;
Ok(Self { file })
}
pub fn entries(&mut self) -> io::Result<Vec<WalEntry>> {
self.file.seek(SeekFrom::Start(0))?;
let mut out = Vec::new();
let mut len_buf = [0u8; 8];
loop {
match self.file.read_exact(&mut len_buf) {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e),
}
let len = u64::from_le_bytes(len_buf) as usize;
let mut payload = vec![0u8; len];
let mut checksum_buf = [0u8; 32];
self.file.read_exact(&mut payload)?;
self.file.read_exact(&mut checksum_buf)?;
if blake3_hash(&payload) != checksum_buf {
break;
}
match decode_from_slice::<WalEntry, _>(&payload, standard()) {
Ok((entry, _)) => out.push(entry),
Err(_) => break,
}
}
Ok(out)
}
}
fn blake3_hash(data: &[u8]) -> [u8; 32] {
let mut h = Hasher::new();
h.update(data);
*h.finalize().as_bytes()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::infinitedb_core::address::{DimensionVector, SpaceId};
use tempfile::NamedTempFile;
fn sample_entry() -> WalEntry {
WalEntry::Write {
address: Address::new(
SpaceId(1),
DimensionVector::new(vec![10, 20]),
),
revision: RevisionId(1),
data: vec![1, 2, 3],
}
}
#[test]
fn roundtrip_single_entry() {
let tmp = NamedTempFile::new().unwrap();
let path = tmp.path().to_path_buf();
let mut writer = WalWriter::open(path.clone()).unwrap();
writer.append(&sample_entry()).unwrap();
drop(writer);
let mut reader = WalReader::open(path).unwrap();
let entries = reader.entries().unwrap();
assert_eq!(entries.len(), 1);
assert!(matches!(entries[0], WalEntry::Write { .. }));
}
#[test]
fn multiple_entries_roundtrip() {
let tmp = NamedTempFile::new().unwrap();
let path = tmp.path().to_path_buf();
let mut writer = WalWriter::open(path.clone()).unwrap();
writer.append(&sample_entry()).unwrap();
writer.append(&WalEntry::Checkpoint { revision: RevisionId(1) }).unwrap();
drop(writer);
let mut reader = WalReader::open(path).unwrap();
let entries = reader.entries().unwrap();
assert_eq!(entries.len(), 2);
}
}