use std::fs::{File, OpenOptions};
use std::io::{self, BufReader, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::OnceLock;
use serde::{Deserialize, Serialize};
use crate::datatypes::Value;
pub const WAL_MAGIC: [u8; 4] = *b"KWAL";
pub const WAL_FORMAT_VERSION: u8 = 1;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum MutationOp {
UpsertNode {
node_type: String,
id: Value,
title: Value,
properties: Vec<(String, Value)>,
},
RemoveNode { node_type: String, id: Value },
UpsertEdge {
conn_type: String,
src_type: String,
src_id: Value,
tgt_type: String,
tgt_id: Value,
properties: Vec<(String, Value)>,
},
RemoveEdge {
conn_type: String,
src_type: String,
src_id: Value,
tgt_type: String,
tgt_id: Value,
},
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct WalFrame {
pub lsn: u64,
pub ops: Vec<MutationOp>,
}
fn crc32_table() -> &'static [u32; 256] {
static TABLE: OnceLock<[u32; 256]> = OnceLock::new();
TABLE.get_or_init(|| {
let mut table = [0u32; 256];
let mut n = 0;
while n < 256 {
let mut c = n as u32;
let mut k = 0;
while k < 8 {
c = if c & 1 != 0 {
0xEDB8_8320 ^ (c >> 1)
} else {
c >> 1
};
k += 1;
}
table[n] = c;
n += 1;
}
table
})
}
pub fn crc32(data: &[u8]) -> u32 {
let table = crc32_table();
let mut crc = 0xFFFF_FFFFu32;
for &b in data {
crc = table[((crc ^ b as u32) & 0xFF) as usize] ^ (crc >> 8);
}
crc ^ 0xFFFF_FFFF
}
pub fn write_header(w: &mut impl Write) -> io::Result<()> {
w.write_all(&WAL_MAGIC)?;
w.write_all(&[WAL_FORMAT_VERSION])?;
Ok(())
}
pub fn append_frame(w: &mut impl Write, frame: &WalFrame) -> io::Result<()> {
let payload =
bincode::serialize(frame).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let len = u32::try_from(payload.len())
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "WAL frame exceeds 4 GiB"))?;
let crc = crc32(&payload);
w.write_all(&len.to_le_bytes())?;
w.write_all(&crc.to_le_bytes())?;
w.write_all(&payload)?;
Ok(())
}
fn read_exact_opt(r: &mut impl Read, buf: &mut [u8]) -> io::Result<Option<()>> {
match r.read_exact(buf) {
Ok(()) => Ok(Some(())),
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => Ok(None),
Err(e) => Err(e),
}
}
pub fn read_header(r: &mut impl Read) -> io::Result<u8> {
let mut magic = [0u8; 4];
r.read_exact(&mut magic)?;
if magic != WAL_MAGIC {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"not a kglite WAL file (bad magic)",
));
}
let mut ver = [0u8; 1];
r.read_exact(&mut ver)?;
Ok(ver[0])
}
pub fn read_frames(mut r: impl Read) -> io::Result<Vec<WalFrame>> {
let version = read_header(&mut r)?;
if version != WAL_FORMAT_VERSION {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"unsupported WAL format version {version} (this build writes \
v{WAL_FORMAT_VERSION}); checkpoint with an older build first"
),
));
}
let mut frames = Vec::new();
loop {
let mut len_buf = [0u8; 4];
if read_exact_opt(&mut r, &mut len_buf)?.is_none() {
break; }
let mut crc_buf = [0u8; 4];
if read_exact_opt(&mut r, &mut crc_buf)?.is_none() {
break; }
let len = u32::from_le_bytes(len_buf) as usize;
let expected_crc = u32::from_le_bytes(crc_buf);
let mut payload = vec![0u8; len];
if read_exact_opt(&mut r, &mut payload)?.is_none() {
break; }
if crc32(&payload) != expected_crc {
break; }
match bincode::deserialize::<WalFrame>(&payload) {
Ok(frame) => frames.push(frame),
Err(_) => break, }
}
Ok(frames)
}
pub fn wal_path(checkpoint: &Path) -> PathBuf {
let mut s = checkpoint.as_os_str().to_owned();
s.push("-wal");
PathBuf::from(s)
}
pub fn recover(path: &Path) -> io::Result<Vec<WalFrame>> {
match File::open(path) {
Ok(f) => read_frames(BufReader::new(f)),
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(Vec::new()),
Err(e) => Err(e),
}
}
#[derive(Debug)]
pub struct Wal {
file: File,
path: PathBuf,
}
impl Wal {
pub fn open(path: PathBuf) -> io::Result<Self> {
let existed = path.exists();
let mut file = OpenOptions::new()
.create(true)
.read(true)
.append(true)
.open(&path)?;
if !existed {
write_header(&mut file)?;
file.sync_all()?;
}
Ok(Self { file, path })
}
pub fn append(&mut self, frame: &WalFrame) -> io::Result<()> {
append_frame(&mut self.file, frame)?;
self.file.flush()?;
self.file.sync_data()?;
Ok(())
}
pub fn reset(&mut self) -> io::Result<()> {
self.file.set_len(0)?;
write_header(&mut self.file)?;
self.file.sync_all()?;
Ok(())
}
pub fn path(&self) -> &Path {
&self.path
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use tempfile::TempDir;
fn sample_ops() -> Vec<MutationOp> {
vec![
MutationOp::UpsertNode {
node_type: "Person".to_string(),
id: Value::Int64(1),
title: Value::String("Alice".to_string()),
properties: vec![
("age".to_string(), Value::Int64(30)),
("city".to_string(), Value::String("Oslo".to_string())),
],
},
MutationOp::UpsertEdge {
conn_type: "KNOWS".to_string(),
src_type: "Person".to_string(),
src_id: Value::Int64(1),
tgt_type: "Person".to_string(),
tgt_id: Value::Int64(2),
properties: vec![("since".to_string(), Value::Int64(2020))],
},
MutationOp::RemoveNode {
node_type: "Person".to_string(),
id: Value::Int64(9),
},
]
}
fn write_wal(frames: &[WalFrame]) -> Vec<u8> {
let mut buf = Vec::new();
write_header(&mut buf).unwrap();
for f in frames {
append_frame(&mut buf, f).unwrap();
}
buf
}
#[test]
fn crc32_matches_known_vector() {
assert_eq!(crc32(b"123456789"), 0xCBF4_3926);
assert_eq!(crc32(b""), 0);
}
#[test]
fn single_frame_round_trips() {
let frame = WalFrame {
lsn: 1,
ops: sample_ops(),
};
let bytes = write_wal(std::slice::from_ref(&frame));
let got = read_frames(Cursor::new(bytes)).unwrap();
assert_eq!(got, vec![frame]);
}
#[test]
fn multiple_frames_preserve_order() {
let frames = vec![
WalFrame {
lsn: 1,
ops: vec![MutationOp::RemoveNode {
node_type: "T".into(),
id: Value::Int64(1),
}],
},
WalFrame {
lsn: 2,
ops: sample_ops(),
},
WalFrame {
lsn: 3,
ops: vec![],
},
];
let bytes = write_wal(&frames);
let got = read_frames(Cursor::new(bytes)).unwrap();
assert_eq!(got, frames);
}
#[test]
fn torn_trailing_frame_is_discarded() {
let frames = vec![
WalFrame {
lsn: 1,
ops: sample_ops(),
},
WalFrame {
lsn: 2,
ops: sample_ops(),
},
];
let mut bytes = write_wal(&frames);
bytes.truncate(bytes.len() - 5);
let got = read_frames(Cursor::new(bytes)).unwrap();
assert_eq!(got, vec![frames[0].clone()]);
}
#[test]
fn truncated_in_length_prefix_is_clean_stop() {
let frames = vec![WalFrame {
lsn: 1,
ops: sample_ops(),
}];
let mut bytes = write_wal(&frames);
bytes.extend_from_slice(&[0u8, 0u8]);
let got = read_frames(Cursor::new(bytes)).unwrap();
assert_eq!(got, frames);
}
#[test]
fn corrupt_payload_crc_mismatch_stops() {
let frame = WalFrame {
lsn: 1,
ops: sample_ops(),
};
let mut bytes = write_wal(std::slice::from_ref(&frame));
let last = bytes.len() - 1;
bytes[last] ^= 0xFF;
let got = read_frames(Cursor::new(bytes)).unwrap();
assert!(got.is_empty(), "corrupt frame must not be returned");
}
#[test]
fn header_only_wal_yields_no_frames() {
let bytes = write_wal(&[]);
let got = read_frames(Cursor::new(bytes)).unwrap();
assert!(got.is_empty());
}
#[test]
fn bad_magic_is_rejected() {
let bytes = b"XXXX\x01".to_vec();
assert!(read_frames(Cursor::new(bytes)).is_err());
}
#[test]
fn empty_reader_is_error() {
let bytes: Vec<u8> = Vec::new();
assert!(read_frames(Cursor::new(bytes)).is_err());
}
fn frame(lsn: u64) -> WalFrame {
WalFrame {
lsn,
ops: sample_ops(),
}
}
#[test]
fn open_creates_with_header_and_appends_survive_reopen() {
let dir = TempDir::new().unwrap();
let p = dir.path().join("g.kgl-wal");
{
let mut wal = Wal::open(p.clone()).unwrap();
wal.append(&frame(1)).unwrap();
wal.append(&frame(2)).unwrap();
} {
let mut wal = Wal::open(p.clone()).unwrap();
wal.append(&frame(3)).unwrap();
}
let frames = recover(&p).unwrap();
assert_eq!(frames.iter().map(|f| f.lsn).collect::<Vec<_>>(), [1, 2, 3]);
}
#[test]
fn reset_truncates_to_header_only() {
let dir = TempDir::new().unwrap();
let p = dir.path().join("g.kgl-wal");
let mut wal = Wal::open(p.clone()).unwrap();
wal.append(&frame(1)).unwrap();
wal.append(&frame(2)).unwrap();
wal.reset().unwrap();
assert!(recover(&p).unwrap().is_empty());
wal.append(&frame(5)).unwrap();
assert_eq!(
recover(&p)
.unwrap()
.iter()
.map(|f| f.lsn)
.collect::<Vec<_>>(),
[5]
);
}
#[test]
fn recover_missing_file_is_empty() {
let dir = TempDir::new().unwrap();
let p = dir.path().join("does-not-exist.kgl-wal");
assert!(recover(&p).unwrap().is_empty());
}
#[test]
fn wal_path_appends_suffix() {
assert_eq!(
wal_path(Path::new("/data/graph.kgl")),
PathBuf::from("/data/graph.kgl-wal")
);
}
}