use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use anyhow::{Context, Result, bail};
use crate::model::Op;
fn frame(op: &Op, buf: &mut Vec<u8>) -> Result<()> {
let payload = bincode::serialize(op).context("bincode serialize")?;
let len = u32::try_from(payload.len()).context("payload too large for u32 len")?;
let crc = crc32fast::hash(&payload);
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(&payload);
buf.extend_from_slice(&crc.to_le_bytes());
Ok(())
}
enum ParseResult {
Good(Op, usize),
TornTail,
Eof,
Corruption(String),
}
fn parse_frame(data: &[u8], pos: usize) -> ParseResult {
if pos >= data.len() {
return ParseResult::Eof;
}
let remaining = &data[pos..];
if remaining.len() < 4 {
return ParseResult::TornTail;
}
let len = u32::from_le_bytes([remaining[0], remaining[1], remaining[2], remaining[3]]) as usize;
let frame_size = 4 + len + 4;
if remaining.len() < frame_size {
return ParseResult::TornTail;
}
let payload = &remaining[4..4 + len];
let stored_crc = u32::from_le_bytes([
remaining[4 + len],
remaining[4 + len + 1],
remaining[4 + len + 2],
remaining[4 + len + 3],
]);
let computed_crc = crc32fast::hash(payload);
if computed_crc != stored_crc {
if remaining.len() > frame_size {
return ParseResult::Corruption(format!(
"CRC mismatch at offset {pos}: expected {computed_crc:#010x}, got {stored_crc:#010x}"
));
} else {
return ParseResult::TornTail;
}
}
match bincode::deserialize::<Op>(payload) {
Ok(op) => {
let next = pos + frame_size;
if next < data.len() || next == data.len() {
ParseResult::Good(op, next)
} else {
ParseResult::Good(op, next)
}
}
Err(e) => {
if remaining.len() > frame_size {
ParseResult::Corruption(format!("bincode decode failed at offset {pos}: {e}"))
} else {
ParseResult::TornTail
}
}
}
}
fn parse_all_frames(data: &[u8]) -> Result<(Vec<Op>, usize)> {
let mut ops = Vec::new();
let mut pos = 0usize;
let mut last_good = 0usize;
loop {
match parse_frame(data, pos) {
ParseResult::Good(op, next) => {
ops.push(op);
last_good = next;
pos = next;
}
ParseResult::Eof => {
break;
}
ParseResult::TornTail => {
break;
}
ParseResult::Corruption(msg) => {
bail!("log corruption: {msg}");
}
}
}
Ok((ops, last_good))
}
enum Backend {
File { file: File, path: PathBuf },
Memory { buf: Vec<u8> },
}
pub struct OpLog {
backend: Backend,
}
impl OpLog {
pub fn open(path: &Path) -> Result<(OpLog, Vec<Op>)> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)
.with_context(|| format!("open log at {}", path.display()))?;
let mut data = Vec::new();
file.read_to_end(&mut data).context("read log file")?;
let (ops, good_len) = parse_all_frames(&data)?;
if good_len < data.len() {
file.set_len(good_len as u64)
.context("truncate torn log tail")?;
}
file.seek(SeekFrom::Start(good_len as u64))
.context("seek to end of log")?;
let path = path.to_path_buf();
Ok((
OpLog {
backend: Backend::File { file, path },
},
ops,
))
}
pub fn in_memory() -> OpLog {
OpLog {
backend: Backend::Memory { buf: Vec::new() },
}
}
pub fn append(&mut self, op: &Op) -> Result<()> {
let mut frame_buf = Vec::new();
frame(op, &mut frame_buf)?;
match &mut self.backend {
Backend::File { file, .. } => {
file.write_all(&frame_buf).context("write log frame")?;
}
Backend::Memory { buf } => {
buf.extend_from_slice(&frame_buf);
}
}
Ok(())
}
pub fn sync(&mut self) -> Result<()> {
match &mut self.backend {
Backend::File { file, .. } => {
file.sync_all().context("sync log")?;
}
Backend::Memory { .. } => {}
}
Ok(())
}
pub fn rewrite(&mut self, ops: &[Op]) -> Result<()> {
match &mut self.backend {
Backend::File { path, .. } => {
let path = path.clone();
let dir = path.parent().unwrap_or(Path::new("."));
let mut frame_buf = Vec::new();
for op in ops {
frame(op, &mut frame_buf)?;
}
let tmp_path = dir.join("log.tmp");
{
let mut tmp = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&tmp_path)
.context("open log.tmp")?;
tmp.write_all(&frame_buf).context("write log.tmp")?;
tmp.sync_all().context("sync log.tmp")?;
}
std::fs::rename(&tmp_path, &path).context("rename log.tmp over log")?;
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(false)
.truncate(false)
.open(&path)
.context("reopen log after rewrite")?;
let new_end = frame_buf.len() as u64;
file.seek(SeekFrom::Start(new_end))
.context("seek to end after rewrite")?;
self.backend = Backend::File { file, path };
Ok(())
}
Backend::Memory { buf } => {
buf.clear();
for op in ops {
frame(op, buf)?;
}
Ok(())
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::io::Write;
use super::*;
use crate::model::Op;
fn make_ops() -> Vec<Op> {
vec![
Op::CreateCollection {
collection: "col1".into(),
},
Op::SetMeta {
collection: "col1".into(),
meta: {
let mut m = BTreeMap::new();
m.insert("k".into(), "v".into());
m
},
},
Op::Upsert {
collection: "col1".into(),
id: "doc1".into(),
row: 0,
attrs: BTreeMap::new(),
},
Op::Delete {
collection: "col1".into(),
id: "doc1".into(),
},
Op::DropCollection {
collection: "col1".into(),
},
]
}
#[test]
fn frame_round_trip_single() {
let op = Op::CreateCollection {
collection: "test".into(),
};
let mut buf = Vec::new();
frame(&op, &mut buf).unwrap();
let (ops, good_len) = parse_all_frames(&buf).unwrap();
assert_eq!(ops, vec![op]);
assert_eq!(good_len, buf.len());
}
#[test]
fn frame_round_trip_many() {
let orig = make_ops();
let mut buf = Vec::new();
for op in &orig {
frame(op, &mut buf).unwrap();
}
let (ops, good_len) = parse_all_frames(&buf).unwrap();
assert_eq!(ops, orig);
assert_eq!(good_len, buf.len());
}
#[test]
fn parse_empty_buffer() {
let (ops, good_len) = parse_all_frames(&[]).unwrap();
assert!(ops.is_empty());
assert_eq!(good_len, 0);
}
#[test]
fn parse_truncated_length_field() {
let (ops, good_len) = parse_all_frames(&[0x01, 0x00]).unwrap();
assert!(ops.is_empty());
assert_eq!(good_len, 0);
}
#[test]
fn parse_torn_tail_truncated_payload() {
let orig = make_ops();
let mut buf = Vec::new();
for op in &orig {
frame(op, &mut buf).unwrap();
}
let good_len = buf.len();
buf.extend_from_slice(&50u32.to_le_bytes());
buf.extend_from_slice(&[0xABu8; 10]);
let (ops, recovered_len) = parse_all_frames(&buf).unwrap();
assert_eq!(ops, orig);
assert_eq!(recovered_len, good_len);
}
#[test]
fn parse_torn_tail_bad_crc() {
let orig = make_ops();
let mut buf = Vec::new();
for op in &orig {
frame(op, &mut buf).unwrap();
}
let good_len = buf.len();
let last_op = Op::CreateCollection {
collection: "last".into(),
};
let payload = bincode::serialize(&last_op).unwrap();
let len = payload.len() as u32;
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(&payload);
buf.extend_from_slice(&0xDEADBEEFu32.to_le_bytes());
let (ops, recovered_len) = parse_all_frames(&buf).unwrap();
assert_eq!(ops, orig);
assert_eq!(recovered_len, good_len);
}
#[test]
fn parse_corruption_in_middle() {
let op1 = Op::CreateCollection {
collection: "a".into(),
};
let op2 = Op::CreateCollection {
collection: "b".into(),
};
let mut buf = Vec::new();
frame(&op1, &mut buf).unwrap();
let bad_op = Op::CreateCollection {
collection: "bad".into(),
};
let payload = bincode::serialize(&bad_op).unwrap();
let len = payload.len() as u32;
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(&payload);
buf.extend_from_slice(&0xDEADBEEFu32.to_le_bytes());
frame(&op2, &mut buf).unwrap();
let result = parse_all_frames(&buf);
assert!(result.is_err(), "expected corruption error");
assert!(result.unwrap_err().to_string().contains("corruption"));
}
#[cfg_attr(miri, ignore)]
#[test]
fn file_round_trip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("log");
let ops = make_ops();
{
let (mut log, replayed) = OpLog::open(&path).unwrap();
assert!(replayed.is_empty());
for op in &ops {
log.append(op).unwrap();
}
log.sync().unwrap();
}
{
let (_, replayed) = OpLog::open(&path).unwrap();
assert_eq!(replayed, ops);
}
}
#[cfg_attr(miri, ignore)]
#[test]
fn crash_recovery_garbage_tail() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("log");
let ops = make_ops();
{
let (mut log, _) = OpLog::open(&path).unwrap();
for op in &ops {
log.append(op).unwrap();
}
log.sync().unwrap();
}
let good_len = std::fs::metadata(&path).unwrap().len();
{
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(&path)
.unwrap();
f.write_all(&[0xFF, 0xAB, 0x12, 0x99, 0x00, 0xDE, 0xAD, 0xBE, 0xEF])
.unwrap();
}
assert!(std::fs::metadata(&path).unwrap().len() > good_len);
let (_, replayed) = OpLog::open(&path).unwrap();
assert_eq!(replayed, ops);
assert_eq!(std::fs::metadata(&path).unwrap().len(), good_len);
}
#[cfg_attr(miri, ignore)]
#[test]
fn crash_recovery_truncated_mid_frame() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("log");
let ops = make_ops();
{
let (mut log, _) = OpLog::open(&path).unwrap();
for op in &ops {
log.append(op).unwrap();
}
log.sync().unwrap();
}
let good_len = std::fs::metadata(&path).unwrap().len();
{
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(&path)
.unwrap();
f.write_all(&100u32.to_le_bytes()).unwrap();
f.write_all(&[0x55u8; 20]).unwrap();
}
let (_, replayed) = OpLog::open(&path).unwrap();
assert_eq!(replayed, ops);
assert_eq!(std::fs::metadata(&path).unwrap().len(), good_len);
}
#[cfg_attr(miri, ignore)]
#[test]
fn crash_recovery_bad_crc_tail() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("log");
let ops = make_ops();
{
let (mut log, _) = OpLog::open(&path).unwrap();
for op in &ops {
log.append(op).unwrap();
}
log.sync().unwrap();
}
let good_len = std::fs::metadata(&path).unwrap().len();
{
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(&path)
.unwrap();
let bad_op = Op::CreateCollection {
collection: "bad".into(),
};
let payload = bincode::serialize(&bad_op).unwrap();
let len = payload.len() as u32;
f.write_all(&len.to_le_bytes()).unwrap();
f.write_all(&payload).unwrap();
f.write_all(&0xDEADBEEFu32.to_le_bytes()).unwrap();
}
let (_, replayed) = OpLog::open(&path).unwrap();
assert_eq!(replayed, ops);
assert_eq!(std::fs::metadata(&path).unwrap().len(), good_len);
}
#[cfg_attr(miri, ignore)]
#[test]
fn rewrite_compaction() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("log");
let (mut log, _) = OpLog::open(&path).unwrap();
for op in &make_ops() {
log.append(op).unwrap();
}
log.sync().unwrap();
let compacted = vec![
Op::CreateCollection {
collection: "col1".into(),
},
Op::Upsert {
collection: "col1".into(),
id: "docX".into(),
row: 0,
attrs: BTreeMap::new(),
},
];
log.rewrite(&compacted).unwrap();
let (_, replayed) = OpLog::open(&path).unwrap();
assert_eq!(replayed, compacted);
}
#[cfg_attr(miri, ignore)]
#[test]
fn append_after_rewrite() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("log");
let (mut log, _) = OpLog::open(&path).unwrap();
log.append(&Op::CreateCollection {
collection: "c".into(),
})
.unwrap();
log.sync().unwrap();
log.rewrite(&[]).unwrap();
let new_op = Op::Upsert {
collection: "c".into(),
id: "x".into(),
row: 42,
attrs: BTreeMap::new(),
};
log.append(&new_op).unwrap();
log.sync().unwrap();
let (_, replayed) = OpLog::open(&path).unwrap();
assert_eq!(replayed, vec![new_op]);
}
#[cfg_attr(miri, ignore)]
#[test]
fn in_memory_append_and_rewrite() {
let mut log = OpLog::in_memory();
let op = Op::CreateCollection {
collection: "m".into(),
};
log.append(&op).unwrap();
log.sync().unwrap();
let ops2 = vec![Op::DropCollection {
collection: "m".into(),
}];
log.rewrite(&ops2).unwrap();
}
}