use crate::graph::types::Fact;
use crate::storage::packed_pages::MAX_FACT_BYTES;
use anyhow::{Result, bail};
use std::fs::{File, OpenOptions};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::Path;
const WAL_MAGIC: [u8; 4] = *b"MWAL";
const WAL_VERSION: u32 = 1;
const WAL_HEADER_SIZE: usize = 32;
fn write_wal_header(file: &mut File) -> Result<()> {
let mut buf = [0u8; WAL_HEADER_SIZE];
buf[0..4].copy_from_slice(&WAL_MAGIC);
buf[4..8].copy_from_slice(&WAL_VERSION.to_le_bytes());
file.seek(SeekFrom::Start(0))?;
file.write_all(&buf)?;
file.sync_all()?;
Ok(())
}
fn validate_wal_header(file: &mut File) -> Result<()> {
let mut buf = [0u8; WAL_HEADER_SIZE];
file.seek(SeekFrom::Start(0))?;
file.read_exact(&mut buf)?;
if buf[0..4] != WAL_MAGIC {
bail!("Invalid WAL magic number: not a .wal file");
}
let version = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]);
if version != WAL_VERSION {
bail!(
"Unsupported WAL version: {} (expected {})",
version,
WAL_VERSION
);
}
Ok(())
}
fn serialize_entry(tx_count: u64, facts: &[Fact]) -> Result<Vec<u8>> {
let mut payload: Vec<u8> = Vec::new();
payload.extend_from_slice(&tx_count.to_le_bytes());
payload.extend_from_slice(&(facts.len() as u64).to_le_bytes());
for fact in facts {
let fact_bytes = postcard::to_allocvec(fact)?;
if fact_bytes.len() > MAX_FACT_BYTES {
bail!(
"Fact serialised size {} bytes exceeds maximum {} bytes. \
Store large payloads externally and reference them with a \
Value::String URL/path or Value::Ref entity ID.",
fact_bytes.len(),
MAX_FACT_BYTES
);
}
let fact_len = fact_bytes.len() as u32;
payload.extend_from_slice(&fact_len.to_le_bytes());
payload.extend_from_slice(&fact_bytes);
}
let checksum = crc32fast::hash(&payload);
let mut entry = Vec::with_capacity(4 + payload.len());
entry.extend_from_slice(&checksum.to_le_bytes());
entry.extend_from_slice(&payload);
Ok(entry)
}
#[derive(Debug)]
pub struct WalEntry {
pub tx_count: u64,
pub facts: Vec<Fact>,
}
pub struct WalWriter {
file: File,
}
impl WalWriter {
pub fn open_or_create(path: &Path) -> Result<Self> {
match OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(path)
{
Ok(mut file) => {
write_wal_header(&mut file)?;
file.seek(SeekFrom::End(0))?;
return Ok(WalWriter { file });
}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
Err(e) => return Err(e.into()),
}
let mut file = OpenOptions::new().read(true).write(true).open(path)?;
validate_wal_header(&mut file)?;
file.seek(SeekFrom::End(0))?;
Ok(WalWriter { file })
}
pub fn append_entry(&mut self, tx_count: u64, facts: &[Fact]) -> Result<()> {
let entry_bytes = serialize_entry(tx_count, facts)?;
self.file.write_all(&entry_bytes)?;
self.file.sync_all()?;
Ok(())
}
pub fn delete_file(path: &Path) -> Result<()> {
let retries: u32 = if cfg!(windows) { 10 } else { 1 };
let mut last_err = None;
for i in 0..retries {
match std::fs::remove_file(path) {
Ok(()) => return Ok(()),
Err(e) => {
last_err = Some(e);
if i + 1 < retries {
std::thread::sleep(std::time::Duration::from_millis(50));
}
}
}
}
Err(anyhow::anyhow!(
"failed to delete WAL file {}: {}",
path.display(),
last_err.unwrap()
))
}
}
pub struct WalReader {
file: File,
}
impl WalReader {
pub fn open(path: &Path) -> Result<Self> {
let mut file = File::open(path)?;
validate_wal_header(&mut file)?;
Ok(WalReader { file })
}
pub fn read_entries(&mut self) -> Result<Vec<WalEntry>> {
self.file.seek(SeekFrom::Start(WAL_HEADER_SIZE as u64))?;
let mut entries = Vec::new();
loop {
let mut csum_buf = [0u8; 4];
match self.file.read_exact(&mut csum_buf) {
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
Ok(()) => {}
}
let expected_csum = u32::from_le_bytes(csum_buf);
let mut tx_count_buf = [0u8; 8];
if self.file.read_exact(&mut tx_count_buf).is_err() {
break; }
let tx_count = u64::from_le_bytes(tx_count_buf);
let mut num_facts_buf = [0u8; 8];
if self.file.read_exact(&mut num_facts_buf).is_err() {
break; }
let num_facts = u64::from_le_bytes(num_facts_buf) as usize;
const MAX_FACTS_PER_ENTRY: usize = 1_000_000;
if num_facts > MAX_FACTS_PER_ENTRY {
break; }
const MAX_FACT_SIZE: usize = 10 * 1024 * 1024;
let mut payload = Vec::new();
payload.extend_from_slice(&tx_count_buf);
payload.extend_from_slice(&num_facts_buf);
let mut facts = Vec::new(); let mut truncated = false;
for _ in 0..num_facts {
let mut len_buf = [0u8; 4];
if self.file.read_exact(&mut len_buf).is_err() {
truncated = true;
break;
}
let fact_len = u32::from_le_bytes(len_buf) as usize;
if fact_len > MAX_FACT_SIZE {
truncated = true;
break;
}
payload.extend_from_slice(&len_buf);
let mut fact_bytes = vec![0u8; fact_len];
if self.file.read_exact(&mut fact_bytes).is_err() {
truncated = true;
break;
}
payload.extend_from_slice(&fact_bytes);
match postcard::from_bytes::<Fact>(&fact_bytes) {
Ok(f) => facts.push(f),
Err(_) => {
truncated = true;
break;
}
}
}
if truncated {
break;
}
let actual_csum = crc32fast::hash(&payload);
if expected_csum != actual_csum {
break; }
entries.push(WalEntry { tx_count, facts });
}
Ok(entries)
}
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use super::*;
use crate::graph::types::{VALID_TIME_FOREVER, Value};
use uuid::Uuid;
fn make_fact(entity: Uuid, attr: &str, value: Value, tx_count: u64) -> Fact {
Fact::with_valid_time(
entity,
attr.to_string(),
value,
1000,
tx_count,
0,
VALID_TIME_FOREVER,
)
}
#[test]
fn test_wal_empty_round_trip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.wal");
let _writer = WalWriter::open_or_create(&path).unwrap();
let mut reader = WalReader::open(&path).unwrap();
let entries = reader.read_entries().unwrap();
assert!(entries.is_empty(), "new WAL should have no entries");
}
#[test]
fn test_wal_single_fact_round_trip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.wal");
let alice = Uuid::new_v4();
let fact = make_fact(alice, ":name", Value::String("Alice".to_string()), 1);
let mut writer = WalWriter::open_or_create(&path).unwrap();
writer.append_entry(1, std::slice::from_ref(&fact)).unwrap();
let mut reader = WalReader::open(&path).unwrap();
let entries = reader.read_entries().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].tx_count, 1);
assert_eq!(entries[0].facts.len(), 1);
assert_eq!(entries[0].facts[0].entity, fact.entity);
assert_eq!(entries[0].facts[0].attribute, fact.attribute);
assert_eq!(entries[0].facts[0].value, fact.value);
}
#[test]
fn test_wal_multi_fact_entry_round_trip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.wal");
let alice = Uuid::new_v4();
let facts = vec![
make_fact(alice, ":name", Value::String("Alice".to_string()), 1),
make_fact(alice, ":age", Value::Integer(30), 1),
];
let mut writer = WalWriter::open_or_create(&path).unwrap();
writer.append_entry(1, &facts).unwrap();
let mut reader = WalReader::open(&path).unwrap();
let entries = reader.read_entries().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].facts.len(), 2);
assert_eq!(entries[0].facts[0].entity, facts[0].entity);
assert_eq!(entries[0].facts[0].attribute, facts[0].attribute);
assert_eq!(entries[0].facts[0].value, facts[0].value);
assert_eq!(entries[0].facts[1].entity, facts[1].entity);
assert_eq!(entries[0].facts[1].attribute, facts[1].attribute);
assert_eq!(entries[0].facts[1].value, facts[1].value);
}
#[test]
fn test_wal_multiple_entries_round_trip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.wal");
let alice = Uuid::new_v4();
let bob = Uuid::new_v4();
let mut writer = WalWriter::open_or_create(&path).unwrap();
writer
.append_entry(
1,
&[make_fact(
alice,
":name",
Value::String("Alice".to_string()),
1,
)],
)
.unwrap();
writer
.append_entry(
2,
&[make_fact(bob, ":name", Value::String("Bob".to_string()), 2)],
)
.unwrap();
let mut reader = WalReader::open(&path).unwrap();
let entries = reader.read_entries().unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].tx_count, 1);
assert_eq!(entries[1].tx_count, 2);
}
#[test]
fn test_wal_reopen_and_append() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.wal");
let alice = Uuid::new_v4();
let bob = Uuid::new_v4();
let mut writer = WalWriter::open_or_create(&path).unwrap();
writer
.append_entry(
1,
&[make_fact(
alice,
":name",
Value::String("Alice".to_string()),
1,
)],
)
.unwrap();
drop(writer);
let mut writer = WalWriter::open_or_create(&path).unwrap();
writer
.append_entry(
2,
&[make_fact(bob, ":name", Value::String("Bob".to_string()), 2)],
)
.unwrap();
drop(writer);
let mut reader = WalReader::open(&path).unwrap();
let entries = reader.read_entries().unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].tx_count, 1);
assert_eq!(entries[1].tx_count, 2);
}
#[test]
fn test_wal_bad_magic_rejected() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("bad.wal");
std::fs::write(&path, b"XXXX\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00").unwrap();
let result = WalReader::open(&path);
assert!(result.is_err(), "bad magic should be rejected");
}
#[test]
fn test_wal_truncated_entry_stops_replay() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.wal");
let alice = Uuid::new_v4();
let fact = make_fact(alice, ":name", Value::String("Alice".to_string()), 1);
let mut writer = WalWriter::open_or_create(&path).unwrap();
writer.append_entry(1, &[fact]).unwrap();
drop(writer);
let mut file = OpenOptions::new().append(true).open(&path).unwrap();
file.write_all(&[0xFF, 0xFF, 0xFF, 0xFF, 0x01]).unwrap(); drop(file);
let mut reader = WalReader::open(&path).unwrap();
let entries = reader.read_entries().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].tx_count, 1);
}
#[test]
fn test_wal_delete_file() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.wal");
WalWriter::open_or_create(&path).unwrap();
assert!(path.exists());
WalWriter::delete_file(&path).unwrap();
assert!(!path.exists());
}
#[test]
fn test_wal_fact_size_limit() {
use crate::graph::types::{Fact, Value};
use uuid::Uuid;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.wal");
let mut writer = WalWriter::open_or_create(&path).unwrap();
let entity = Uuid::new_v4();
let fact = Fact::new(
entity,
":test".to_string(),
Value::String("x".to_string()),
1,
);
writer.append_entry(1, &[fact]).unwrap();
writer.file.sync_all().unwrap();
let mut file = std::fs::OpenOptions::new().write(true).open(&path).unwrap();
file.seek(std::io::SeekFrom::End(-20)).unwrap();
let huge_len: u32 = (10 * 1024 * 1024 + 1) as u32; file.write_all(&huge_len.to_le_bytes()).unwrap();
drop(file);
let mut reader = WalReader::open(&path).unwrap();
let result = reader.read_entries();
assert!(
result.is_err() || result.unwrap().is_empty(),
"Should fail or return empty on corrupted large fact"
);
}
}