use std::{
fs::{self, File},
io::{Read, Seek, SeekFrom, Write},
path::Path,
};
use zerocopy::{FromBytes, IntoBytes};
use crate::{
DbError,
crc::{checksum, checksum_append},
wire::{
LogRecordHeader, MutationOp, OXGDB_FORMAT_VERSION, OXGLOGR, SUPERBLOCK_CRC_PREFIX_LEN,
SUPERBLOCK_MAGIC, SuperblockRecord,
},
};
pub(crate) const SUPERBLOCK_FILE: &str = "super.oxgdb";
const SUPERBLOCK_TEMP_FILE: &str = "super.oxgdb.tmp";
#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) struct CommitFrame {
pub(crate) lsn: u64,
pub(crate) txn_id: u64,
pub(crate) base_generation: u64,
pub(crate) ops: Vec<MutationOp>,
pub(crate) blob: Vec<u8>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) struct ReplayOutcome {
pub(crate) frames: Vec<CommitFrame>,
pub(crate) valid_len: usize,
pub(crate) last_lsn: u64,
pub(crate) last_txn: u64,
}
const HEADER_LEN: usize = size_of::<LogRecordHeader>();
const OP_LEN: usize = size_of::<MutationOp>();
const HEADER_CRC_OFFSET: usize = HEADER_LEN - size_of::<u32>();
fn record_crc(record: &[u8]) -> u32 {
let prefix = &record[..HEADER_CRC_OFFSET];
let suffix = &record[HEADER_CRC_OFFSET + size_of::<u32>()..];
checksum_append(checksum(prefix), suffix)
}
pub(crate) fn encode_commit(
lsn: u64,
txn_id: u64,
base_generation: u64,
ops: &[MutationOp],
blob: &[u8],
) -> Result<Vec<u8>, DbError> {
let len = HEADER_LEN + ops.len() * OP_LEN + blob.len();
let len_u32 = u32::try_from(len)
.map_err(|_overflow| DbError::invalid_store("delta-log record too large"))?;
let op_count_u32 = u32::try_from(ops.len())
.map_err(|_overflow| DbError::invalid_store("delta-log op count too large"))?;
let header = LogRecordHeader {
base_generation: base_generation.into(),
lsn: lsn.into(),
txn_id: txn_id.into(),
magic: OXGLOGR.into(),
len: len_u32.into(),
op_count: op_count_u32.into(),
crc32c: 0u32.into(),
};
let mut record = Vec::with_capacity(len);
record.extend_from_slice(header.as_bytes());
for op in ops {
record.extend_from_slice(op.as_bytes());
}
record.extend_from_slice(blob);
let crc = record_crc(&record);
record[HEADER_CRC_OFFSET..HEADER_CRC_OFFSET + size_of::<u32>()]
.copy_from_slice(&crc.to_le_bytes());
Ok(record)
}
enum RecordParse {
Stop,
Loud(DbError),
Frame(CommitFrame, usize),
}
#[derive(Clone, Copy)]
struct LsnCursor {
last_lsn: u64,
seen: bool,
}
fn decode_ops(record: &[u8], framed_min: usize, op_count: usize) -> (Vec<MutationOp>, Vec<u8>) {
let ops_bytes = &record[HEADER_LEN..framed_min];
let mut ops = Vec::with_capacity(op_count);
for chunk in ops_bytes.chunks_exact(OP_LEN) {
let Ok(op) = MutationOp::read_from_bytes(chunk) else {
break;
};
ops.push(op);
}
let blob = record[framed_min..].to_vec();
(ops, blob)
}
fn tail_or_loud(is_last: bool, error: DbError) -> RecordParse {
if is_last {
RecordParse::Stop
} else {
RecordParse::Loud(error)
}
}
fn parse_one_record(base_generation: u64, remaining: &[u8], cursor: LsnCursor) -> RecordParse {
if remaining.len() < HEADER_LEN {
return RecordParse::Stop;
}
let Ok(header) = LogRecordHeader::read_from_bytes(&remaining[..HEADER_LEN]) else {
return RecordParse::Stop;
};
let len = header.len.get() as usize;
let op_count = header.op_count.get() as usize;
let lsn = header.lsn.get();
let body_len = op_count
.checked_mul(OP_LEN)
.and_then(|ops_bytes| ops_bytes.checked_add(HEADER_LEN));
let frame_well_framed = matches!(body_len, Some(body_bytes) if body_bytes <= len)
&& len <= remaining.len()
&& len >= HEADER_LEN;
let is_last = !frame_well_framed || len >= remaining.len();
if header.magic.get() != OXGLOGR {
return tail_or_loud(
is_last,
DbError::LogCorrupt {
lsn,
reason: "delta-log record magic mismatch",
},
);
}
let record_generation = header.base_generation.get();
if record_generation != base_generation {
return tail_or_loud(
is_last,
DbError::BaseGenerationMismatch {
expected: base_generation,
found: record_generation,
},
);
}
let Some(framed_min) = body_len else {
return RecordParse::Stop;
};
if len < framed_min || len < HEADER_LEN {
return tail_or_loud(
is_last,
DbError::LogCorrupt {
lsn,
reason: "delta-log record length undershoots its framing",
},
);
}
if len > remaining.len() {
return RecordParse::Stop;
}
let record = &remaining[..len];
if record_crc(record) != header.crc32c.get() {
return tail_or_loud(
is_last,
DbError::LogCorrupt {
lsn,
reason: "delta-log record checksum mismatch",
},
);
}
if cursor.seen && lsn <= cursor.last_lsn {
return RecordParse::Loud(DbError::LogCorrupt {
lsn,
reason: "delta-log record LSN not strictly ascending",
});
}
let (ops, blob) = decode_ops(record, framed_min, op_count);
RecordParse::Frame(
CommitFrame {
lsn,
txn_id: header.txn_id.get(),
base_generation: record_generation,
ops,
blob,
},
len,
)
}
pub(crate) fn replay(base_generation: u64, log: &[u8]) -> Result<ReplayOutcome, DbError> {
let mut frames: Vec<CommitFrame> = Vec::new();
let mut offset = 0usize;
let mut valid_len = 0usize;
let mut last_lsn = 0u64;
let mut last_txn = 0u64;
let mut seen_lsn = false;
while offset < log.len() {
let cursor = LsnCursor {
last_lsn,
seen: seen_lsn,
};
match parse_one_record(base_generation, &log[offset..], cursor) {
RecordParse::Stop => break,
RecordParse::Loud(error) => return Err(error),
RecordParse::Frame(frame, len) => {
last_lsn = frame.lsn;
last_txn = frame.txn_id;
seen_lsn = true;
frames.push(frame);
offset += len;
valid_len = offset;
}
}
}
Ok(ReplayOutcome {
frames,
valid_len,
last_lsn,
last_txn,
})
}
pub(crate) fn read_superblock(root: &Path) -> Result<SuperblockRecord, DbError> {
let path = root.join(SUPERBLOCK_FILE);
let mut file = File::open(&path).map_err(|error| match error.kind() {
std::io::ErrorKind::NotFound => DbError::NotFound,
_other => DbError::io("open superblock", error),
})?;
let mut bytes = Vec::new();
file.read_to_end(&mut bytes)
.map_err(|error| DbError::io("read superblock", error))?;
let record = SuperblockRecord::read_from_bytes(bytes.as_slice())
.map_err(|_error| DbError::invalid_store("superblock size mismatch"))?;
if record.magic != SUPERBLOCK_MAGIC {
return Err(DbError::invalid_store("superblock magic mismatch"));
}
let expected = checksum(&bytes[..SUPERBLOCK_CRC_PREFIX_LEN]);
if record.crc32c.get() != expected {
return Err(DbError::invalid_store("superblock checksum mismatch"));
}
if record.format_version.get() != OXGDB_FORMAT_VERSION {
return Err(DbError::invalid_store(
"superblock format version unsupported",
));
}
if record.flags.get() != 0 || record.pad.get() != 0 {
return Err(DbError::invalid_store("superblock reserved word not zero"));
}
Ok(record)
}
pub(crate) fn write_superblock(root: &Path, sb: &SuperblockRecord) -> Result<(), DbError> {
fs::create_dir_all(root).map_err(|error| DbError::io("create database directory", error))?;
let mut record = *sb;
record.crc32c = 0u32.into();
let mut bytes = record.as_bytes().to_vec();
let crc = checksum(&bytes[..SUPERBLOCK_CRC_PREFIX_LEN]);
record.crc32c = crc.into();
bytes = record.as_bytes().to_vec();
let temp_path = root.join(SUPERBLOCK_TEMP_FILE);
let mut file =
File::create(&temp_path).map_err(|error| DbError::io("create superblock", error))?;
file.write_all(&bytes)
.map_err(|error| DbError::io("write superblock", error))?;
file.flush()
.map_err(|error| DbError::io("flush superblock", error))?;
file.sync_all()
.map_err(|error| DbError::io("sync superblock", error))?;
fs::rename(&temp_path, root.join(SUPERBLOCK_FILE))
.map_err(|error| DbError::io("publish superblock", error))?;
sync_directory(root)
}
pub(crate) fn append_commit(log: &mut File, frame: &[u8]) -> Result<(), DbError> {
let eof = log
.seek(SeekFrom::End(0))
.map_err(|error| DbError::io("seek delta-log end", error))?;
if let Err(error) = log.write_all(frame) {
rollback_to(log, eof);
return Err(DbError::io("append delta-log record", error));
}
if let Err(error) = log.sync_all() {
rollback_to(log, eof);
return Err(DbError::io("sync delta-log", error));
}
Ok(())
}
fn rollback_to(log: &File, eof: u64) {
let _truncate = log.set_len(eof);
let _sync = log.sync_all();
}
#[cfg(unix)]
fn sync_directory(path: &Path) -> Result<(), DbError> {
let directory =
File::open(path).map_err(|error| DbError::io("open database directory", error))?;
directory
.sync_all()
.map_err(|error| DbError::io("sync database directory", error))
}
#[cfg(not(unix))]
fn sync_directory(_path: &Path) -> Result<(), DbError> {
Ok(())
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU64, Ordering};
use proptest::prelude::*;
use zerocopy::byteorder::{LE, U64};
use super::*;
use crate::wire::{MUTATION_OP_PAYLOAD_WORDS, OP_CREATE_ELEMENT, OP_NEXT_ID_WATERMARK};
static NEXT_PATH: AtomicU64 = AtomicU64::new(0);
fn temp_root(name: &str) -> std::path::PathBuf {
let id = NEXT_PATH.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!("oxgraph-wal-{name}-{}-{id}", std::process::id()))
}
fn op(op_kind: u32, flags: u32, words: &[u64]) -> MutationOp {
let mut payload = [U64::<LE>::new(0); MUTATION_OP_PAYLOAD_WORDS];
for (slot, value) in payload.iter_mut().zip(words) {
*slot = U64::new(*value);
}
MutationOp {
op_kind: op_kind.into(),
flags: flags.into(),
payload,
}
}
#[test]
fn single_commit_roundtrips() {
let ops = vec![op(OP_CREATE_ELEMENT, 0, &[7])];
let blob = b"hello".to_vec();
let record = encode_commit(3, 5, 9, &ops, &blob).expect("encode");
let outcome = replay(9, &record).expect("replay");
assert_eq!(outcome.valid_len, record.len());
assert_eq!(outcome.last_lsn, 3);
assert_eq!(outcome.last_txn, 5);
assert_eq!(outcome.frames.len(), 1);
let frame = &outcome.frames[0];
assert_eq!(frame.ops, ops);
assert_eq!(frame.blob, blob);
assert_eq!(frame.base_generation, 9);
}
#[test]
fn interior_corruption_is_loud() {
let first = encode_commit(1, 1, 0, &[op(OP_CREATE_ELEMENT, 0, &[1])], b"").expect("encode");
let second =
encode_commit(2, 2, 0, &[op(OP_CREATE_ELEMENT, 0, &[2])], b"").expect("encode");
let mut log = first;
log.extend_from_slice(&second);
let flip_at = HEADER_LEN + 8;
log[flip_at] ^= 0xFF;
match replay(0, &log) {
Err(DbError::LogCorrupt { .. }) => {}
other => panic!("expected LogCorrupt, got {other:?}"),
}
}
#[test]
fn base_generation_mismatch_is_loud() {
let g1a = encode_commit(1, 1, 1, &[op(OP_CREATE_ELEMENT, 0, &[1])], b"").expect("encode");
let g2 = encode_commit(2, 2, 2, &[op(OP_CREATE_ELEMENT, 0, &[2])], b"").expect("encode");
let g1b = encode_commit(3, 3, 1, &[op(OP_CREATE_ELEMENT, 0, &[3])], b"").expect("encode");
let mut log = g1a;
log.extend_from_slice(&g2);
log.extend_from_slice(&g1b);
match replay(1, &log) {
Err(DbError::BaseGenerationMismatch { expected, found }) => {
assert_eq!(expected, 1);
assert_eq!(found, 2);
}
other => panic!("expected BaseGenerationMismatch, got {other:?}"),
}
}
#[test]
fn torn_tail_truncates_silently() {
let first =
encode_commit(1, 1, 0, &[op(OP_CREATE_ELEMENT, 0, &[1])], b"a").expect("encode");
let second =
encode_commit(2, 2, 0, &[op(OP_CREATE_ELEMENT, 0, &[2])], b"bb").expect("encode");
let mut log = first.clone();
log.extend_from_slice(&second);
let cut = first.len() + 4;
let outcome = replay(0, &log[..cut]).expect("replay");
assert_eq!(outcome.valid_len, first.len());
assert_eq!(outcome.frames.len(), 1);
assert_eq!(outcome.last_lsn, 1);
}
#[test]
fn superblock_roundtrips() {
let root = temp_root("sb-roundtrip");
let sb = SuperblockRecord {
magic: SUPERBLOCK_MAGIC,
base_generation: 4u64.into(),
checkpoint_lsn: 11u64.into(),
log_byte_offset: 0u64.into(),
commit_seq: 17u64.into(),
transaction_id: 19u64.into(),
format_version: crate::wire::OXGDB_FORMAT_VERSION.into(),
flags: 0u32.into(),
crc32c: 0u32.into(),
pad: 0u32.into(),
};
write_superblock(&root, &sb).expect("write");
let read = read_superblock(&root).expect("read");
assert_eq!(read.base_generation.get(), 4);
assert_eq!(read.checkpoint_lsn.get(), 11);
assert_eq!(read.commit_seq.get(), 17);
assert_eq!(read.transaction_id.get(), 19);
let _ignore = std::fs::remove_dir_all(&root);
}
#[test]
fn superblock_corruption_detected() {
let root = temp_root("sb-corrupt");
let sb = SuperblockRecord {
magic: SUPERBLOCK_MAGIC,
base_generation: 1u64.into(),
checkpoint_lsn: 0u64.into(),
log_byte_offset: 0u64.into(),
commit_seq: 0u64.into(),
transaction_id: 0u64.into(),
format_version: crate::wire::OXGDB_FORMAT_VERSION.into(),
flags: 0u32.into(),
crc32c: 0u32.into(),
pad: 0u32.into(),
};
write_superblock(&root, &sb).expect("write");
let path = root.join(SUPERBLOCK_FILE);
let mut bytes = std::fs::read(&path).expect("read bytes");
bytes[8] ^= 0xFF;
std::fs::write(&path, &bytes).expect("rewrite");
match read_superblock(&root) {
Err(DbError::InvalidStore { .. }) => {}
other => panic!("expected InvalidStore, got {other:?}"),
}
let _ignore = std::fs::remove_dir_all(&root);
}
#[test]
fn superblock_unsupported_version_rejected() {
let root = temp_root("sb-version");
let sb = SuperblockRecord {
magic: SUPERBLOCK_MAGIC,
base_generation: 1u64.into(),
checkpoint_lsn: 0u64.into(),
log_byte_offset: 0u64.into(),
commit_seq: 0u64.into(),
transaction_id: 0u64.into(),
format_version: (OXGDB_FORMAT_VERSION + 1).into(),
flags: 0u32.into(),
crc32c: 0u32.into(),
pad: 0u32.into(),
};
write_superblock(&root, &sb).expect("write");
match read_superblock(&root) {
Err(DbError::InvalidStore { .. }) => {}
other => panic!("expected InvalidStore, got {other:?}"),
}
let _ignore = std::fs::remove_dir_all(&root);
}
#[test]
fn append_commit_then_replay() {
let root = temp_root("append");
std::fs::create_dir_all(&root).expect("mkdir");
let path = root.join("delta-0.log");
let mut file = std::fs::OpenOptions::new()
.create(true)
.read(true)
.append(true)
.open(&path)
.expect("open log");
let frame =
encode_commit(1, 1, 0, &[op(OP_CREATE_ELEMENT, 0, &[1])], b"x").expect("encode");
append_commit(&mut file, &frame).expect("append");
let bytes = std::fs::read(&path).expect("read log");
let outcome = replay(0, &bytes).expect("replay");
assert_eq!(outcome.frames.len(), 1);
assert_eq!(outcome.valid_len, frame.len());
let _ignore = std::fs::remove_dir_all(&root);
}
fn commit_strategy() -> impl Strategy<Value = (u64, Vec<MutationOp>, Vec<u8>)> {
(
any::<u64>(),
prop::collection::vec(any::<u64>(), 0..4),
prop::collection::vec(any::<u8>(), 0..12),
)
.prop_map(|(txn, ids, blob)| {
let mut ops: Vec<MutationOp> = ids
.into_iter()
.map(|id| op(OP_CREATE_ELEMENT, 0, &[id]))
.collect();
ops.push(op(OP_NEXT_ID_WATERMARK, 0, &[1, 1, 1, 1, 1, 1, 1, 1, 1]));
(txn, ops, blob)
})
}
proptest! {
#[test]
fn arbitrary_truncation_recovers_valid_prefix(
commits in prop::collection::vec(commit_strategy(), 1..6),
cut_fraction in 0u64..=1000,
) {
let base_generation = 0u64;
let mut buf = Vec::new();
let mut boundaries = Vec::new();
for (index, (txn, ops, blob)) in commits.iter().enumerate() {
let lsn = index as u64 + 1;
let record = encode_commit(lsn, *txn, base_generation, ops, blob).expect("encode");
buf.extend_from_slice(&record);
boundaries.push(buf.len());
}
let fraction = usize::try_from(cut_fraction).unwrap_or(usize::MAX);
let cut = fraction * buf.len() / 1000;
let expected_len = boundaries
.iter()
.copied()
.rfind(|&boundary| boundary <= cut)
.unwrap_or(0);
let expected_count = boundaries.iter().filter(|boundary| **boundary <= cut).count();
let outcome = replay(base_generation, &buf[..cut])?;
prop_assert_eq!(outcome.valid_len, expected_len);
prop_assert_eq!(outcome.frames.len(), expected_count);
for (index, frame) in outcome.frames.iter().enumerate() {
prop_assert_eq!(frame.lsn, index as u64 + 1);
}
}
}
}