use std::fs;
use std::io::{Seek, SeekFrom, Write};
use std::path::PathBuf;
use tempfile::tempdir;
use super::codec::{
crc32, encode_file_header, FileHeader, FILE_HEADER_SIZE, FORMAT_VERSION, RECORD_MAGIC,
};
use super::reader::replay;
use super::wal_op::WalOp;
use super::writer::{WalWriter, AUTO_FLUSH_THRESHOLD};
use crate::api::errors::Error;
fn wal_path(dir: &tempfile::TempDir) -> PathBuf {
dir.path().join("test.wal")
}
fn append_raw_record(out: &mut Vec<u8>, seq: u64, ty: u8, body: &[u8]) {
let start = out.len();
out.extend_from_slice(&RECORD_MAGIC.to_le_bytes());
out.extend_from_slice(&(body.len() as u32).to_le_bytes());
out.extend_from_slice(&seq.to_le_bytes());
out.push(ty);
out.extend_from_slice(body);
let crc = crc32(&out[start..]);
out.extend_from_slice(&crc.to_le_bytes());
}
fn sample_ops() -> Vec<WalOp> {
vec![
WalOp::Insert {
key: b"img/01.jpg".to_vec(),
value: vec![0xAA; 64],
},
WalOp::Insert {
key: b"img/02.jpg".to_vec(),
value: vec![0xBB; 64],
},
WalOp::Erase {
key: b"img/01.jpg".to_vec(),
},
WalOp::RenameObject {
src_key: b"img/02.jpg".to_vec(),
dst_key: b"img/02-renamed.jpg".to_vec(),
force: false,
},
]
}
#[test]
fn create_open_round_trip_all_variants() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
let ops = sample_ops();
let mut w = WalWriter::create(&path, 42).unwrap();
assert_eq!(w.header().tree_id, 42);
assert_eq!(w.bytes_written(), FILE_HEADER_SIZE as u64);
for (i, op) in ops.iter().enumerate() {
w.append(op, i as u64 + 1).unwrap();
}
w.flush().unwrap();
let mut collected = Vec::new();
let (header, stats) = replay(&path, |op, seq, _off| {
collected.push((format!("{op:?}"), seq));
Ok(())
})
.unwrap();
assert_eq!(header.tree_id, 42);
assert_eq!(stats.records_seen, ops.len() as u64);
assert_eq!(stats.highest_seq, Some(ops.len() as u64));
assert_eq!(stats.torn_tail_at, None);
assert_eq!(collected.len(), ops.len());
for (i, (decoded_dbg, seq)) in collected.iter().enumerate() {
assert_eq!(*seq, i as u64 + 1);
assert_eq!(decoded_dbg, &format!("{:?}", ops[i]));
}
}
#[test]
fn replay_rejects_removed_structural_tag() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
let mut bytes = Vec::new();
encode_file_header(
&FileHeader {
tree_id: 0,
created_at: 0,
},
&mut bytes,
);
append_raw_record(&mut bytes, 7, 2, &[]);
fs::write(&path, &bytes).unwrap();
match replay(&path, |_, _, _| Ok(())) {
Err(Error::ReplaySanityFailed {
context,
record_offset,
}) => {
assert!(context.contains("variant"));
assert_eq!(record_offset, FILE_HEADER_SIZE as u64);
}
other => panic!("expected removed structural tag rejection, got {other:?}"),
}
}
#[test]
fn open_existing_resumes_append_position() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
{
let mut w = WalWriter::create(&path, 7).unwrap();
w.append(
&WalOp::Insert {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
},
1,
)
.unwrap();
w.flush().unwrap();
}
{
let mut w = WalWriter::open_existing(&path).unwrap();
assert_eq!(w.header().tree_id, 7);
w.append(
&WalOp::Erase {
key: b"k1".to_vec(),
},
2,
)
.unwrap();
w.flush().unwrap();
}
let mut seen = Vec::new();
let (_h, stats) = replay(&path, |op, seq, _| {
seen.push((format!("{op:?}"), seq));
Ok(())
})
.unwrap();
assert_eq!(stats.records_seen, 2);
assert_eq!(stats.highest_seq, Some(2));
}
#[test]
fn open_or_create_uses_existing_when_present() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
let _ = WalWriter::create(&path, 99).unwrap();
let w = WalWriter::open_or_create(&path, 99).unwrap();
assert_eq!(w.header().tree_id, 99);
}
#[test]
fn open_or_create_rejects_mismatched_tree_id() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
let _ = WalWriter::create(&path, 99).unwrap();
match WalWriter::open_or_create(&path, 100) {
Err(Error::ReplaySanityFailed { context, .. }) => {
assert!(context.contains("tree_id"));
}
other => panic!("expected tree-id mismatch error, got {other:?}"),
}
}
#[test]
fn unflushed_records_are_lost_after_drop() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
{
let mut w = WalWriter::create(&path, 0).unwrap();
w.append(
&WalOp::Insert {
key: b"transient".to_vec(),
value: b"never-persisted".to_vec(),
},
1,
)
.unwrap();
drop(w);
}
let on_disk = fs::metadata(&path).unwrap().len();
assert_eq!(on_disk, FILE_HEADER_SIZE as u64);
let (_h, stats) = replay(&path, |_, _, _| Ok(())).unwrap();
assert_eq!(stats.records_seen, 0);
assert_eq!(stats.torn_tail_at, None);
}
#[test]
fn torn_tail_is_recovered_gracefully() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
let ops = sample_ops();
{
let mut w = WalWriter::create(&path, 0).unwrap();
for (i, op) in ops.iter().enumerate() {
w.append(op, i as u64 + 1).unwrap();
}
w.flush().unwrap();
}
{
let file = std::fs::OpenOptions::new().write(true).open(&path).unwrap();
let len = file.metadata().unwrap().len();
file.set_len(len - 8).unwrap();
}
let mut seen = Vec::new();
let (_h, stats) = replay(&path, |_, seq, _| {
seen.push(seq);
Ok(())
})
.unwrap();
assert!(stats.torn_tail_at.is_some());
assert_eq!(seen.len(), ops.len() - 1);
assert_eq!(stats.records_seen, ops.len() as u64 - 1);
assert_eq!(stats.highest_seq, Some(ops.len() as u64 - 1));
}
#[test]
fn mid_file_corruption_propagates_with_offset() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
let ops = sample_ops();
{
let mut w = WalWriter::create(&path, 0).unwrap();
for (i, op) in ops.iter().enumerate() {
w.append(op, i as u64 + 1).unwrap();
}
w.flush().unwrap();
}
let mut bytes = fs::read(&path).unwrap();
let len_pos = FILE_HEADER_SIZE + 4;
let first_body_len =
u32::from_le_bytes(bytes[len_pos..len_pos + 4].try_into().unwrap()) as usize;
let first_record_end = FILE_HEADER_SIZE + 17 + first_body_len + 4; bytes[first_record_end + 20] ^= 0xFF;
fs::write(&path, &bytes).unwrap();
match replay(&path, |_, _, _| Ok(())) {
Err(Error::ReplaySanityFailed {
context,
record_offset,
}) => {
assert!(record_offset > 0, "offset should be patched in");
assert!(record_offset >= first_record_end as u64);
assert!(
context.contains("CRC") || context.contains("magic") || context.contains("variant"),
"unexpected sanity context: {context}",
);
}
other => panic!("expected mid-file sanity failure, got {other:?}"),
}
}
#[test]
fn replay_callback_can_short_circuit() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
let mut w = WalWriter::create(&path, 0).unwrap();
for i in 0..10 {
w.append(
&WalOp::Insert {
key: format!("k{i}").into_bytes(),
value: vec![i as u8],
},
i + 1,
)
.unwrap();
}
w.flush().unwrap();
let mut count = 0;
let r = replay(&path, |_, _, _| {
count += 1;
if count == 4 {
Err(Error::NotFound)
} else {
Ok(())
}
});
assert!(matches!(r, Err(Error::NotFound)));
assert_eq!(count, 4);
}
#[test]
fn rejected_file_with_wrong_magic() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
let mut bogus = vec![0u8; FILE_HEADER_SIZE];
bogus[0..4].copy_from_slice(&0xDEAD_BEEFu32.to_le_bytes());
bogus[4..8].copy_from_slice(&FORMAT_VERSION.to_le_bytes());
fs::write(&path, &bogus).unwrap();
match replay(&path, |_, _, _| Ok(())) {
Err(Error::ReplaySanityFailed { context, .. }) => {
assert!(context.contains("magic"));
}
other => panic!("expected magic mismatch, got {other:?}"),
}
}
#[test]
fn rejected_file_with_unsupported_version() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
let mut bogus = vec![0u8; FILE_HEADER_SIZE];
bogus[0..4].copy_from_slice(&super::codec::FILE_MAGIC.to_le_bytes());
bogus[4..8].copy_from_slice(&999u32.to_le_bytes());
fs::write(&path, &bogus).unwrap();
match replay(&path, |_, _, _| Ok(())) {
Err(Error::ReplaySanityFailed { context, .. }) => {
assert!(context.contains("version"));
}
other => panic!("expected version mismatch, got {other:?}"),
}
}
#[test]
fn discard_pending_keeps_already_flushed_records() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
let mut w = WalWriter::create(&path, 0).unwrap();
w.append(
&WalOp::Insert {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
},
1,
)
.unwrap();
w.flush().unwrap();
w.append(
&WalOp::Insert {
key: b"k2".to_vec(),
value: b"v2".to_vec(),
},
2,
)
.unwrap();
w.discard_pending();
drop(w);
let (_h, stats) = replay(&path, |_, _, _| Ok(())).unwrap();
assert_eq!(stats.records_seen, 1);
assert_eq!(stats.highest_seq, Some(1));
}
#[test]
fn empty_wal_file_after_header_only() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
let mut w = WalWriter::create(&path, 0).unwrap();
w.flush().unwrap();
drop(w);
let (header, stats) = replay(&path, |_, _, _| Ok(())).unwrap();
assert_eq!(header.tree_id, 0);
assert_eq!(stats.records_seen, 0);
assert_eq!(stats.highest_seq, None);
assert_eq!(stats.torn_tail_at, None);
}
#[test]
fn truncate_reuses_live_wal_file_in_place() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
let mut w = WalWriter::create(&path, 0).unwrap();
w.append(
&WalOp::Insert {
key: b"before-truncate".to_vec(),
value: b"v".to_vec(),
},
1,
)
.unwrap();
w.flush().unwrap();
assert!(fs::metadata(&path).unwrap().len() > FILE_HEADER_SIZE as u64);
w.truncate().unwrap();
assert_eq!(w.bytes_written(), FILE_HEADER_SIZE as u64);
assert_eq!(fs::metadata(&path).unwrap().len(), FILE_HEADER_SIZE as u64);
w.append(
&WalOp::Insert {
key: b"after-truncate".to_vec(),
value: b"v2".to_vec(),
},
2,
)
.unwrap();
w.flush().unwrap();
drop(w);
let mut seen = Vec::new();
let (header, stats) = replay(&path, |op, seq, _| {
seen.push((seq, op.clone()));
Ok(())
})
.unwrap();
assert_eq!(header.tree_id, 0);
assert_eq!(stats.records_seen, 1);
assert_eq!(stats.highest_seq, Some(2));
assert_eq!(seen.len(), 1);
}
#[test]
fn many_records_stream_round_trip() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
const N: u64 = 200;
{
let mut w = WalWriter::create(&path, 0).unwrap();
for i in 1..=N {
w.append(
&WalOp::Insert {
key: format!("k{i:04}").into_bytes(),
value: format!("v{i}").into_bytes(),
},
i,
)
.unwrap();
}
w.flush().unwrap();
}
let mut max_seq = 0u64;
let (_h, stats) = replay(&path, |_, seq, _| {
max_seq = max_seq.max(seq);
Ok(())
})
.unwrap();
assert_eq!(stats.records_seen, N);
assert_eq!(stats.highest_seq, Some(N));
assert_eq!(max_seq, N);
}
#[test]
fn auto_flush_keeps_user_space_buffer_bounded() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
let mut w = WalWriter::create(&path, 0).unwrap();
let target_records = (AUTO_FLUSH_THRESHOLD / 80) * 3;
for i in 0..target_records as u64 {
w.append(
&WalOp::Insert {
key: format!("k{i:06}").into_bytes(),
value: vec![0xAB; 32],
},
i + 1,
)
.unwrap();
}
let on_disk_before_flush = fs::metadata(&path).unwrap().len();
assert!(
on_disk_before_flush > AUTO_FLUSH_THRESHOLD as u64,
"auto-flush should have pushed bytes to disk: on-disk = {on_disk_before_flush}",
);
let pending_upper_bound = AUTO_FLUSH_THRESHOLD + 256;
let bytes_written_total = w.bytes_written();
let pending_size = bytes_written_total - on_disk_before_flush;
assert!(
pending_size <= pending_upper_bound as u64,
"pending tail should be bounded: {pending_size} bytes",
);
w.flush().unwrap();
drop(w);
let mut seen = Vec::new();
let (_h, stats) = replay(&path, |_, seq, _| {
seen.push(seq);
Ok(())
})
.unwrap();
assert_eq!(stats.records_seen, target_records as u64);
assert_eq!(stats.highest_seq, Some(target_records as u64));
assert_eq!(stats.torn_tail_at, None);
}
#[test]
fn appending_after_external_truncate_grows_file_again() {
let dir = tempdir().unwrap();
let path = wal_path(&dir);
let mut w = WalWriter::create(&path, 0).unwrap();
w.append(
&WalOp::Insert {
key: b"keep".to_vec(),
value: b"v".to_vec(),
},
1,
)
.unwrap();
w.flush().unwrap();
let f = std::fs::OpenOptions::new().write(true).open(&path).unwrap();
f.set_len(FILE_HEADER_SIZE as u64).unwrap();
let mut f = f;
f.seek(SeekFrom::Start(FILE_HEADER_SIZE as u64)).unwrap();
let _ = f.write(&[]).unwrap();
w.append(
&WalOp::Insert {
key: b"after-truncate".to_vec(),
value: b"v".to_vec(),
},
2,
)
.unwrap();
w.flush().unwrap();
drop(w);
let mut seqs = Vec::new();
let _ = replay(&path, |_, seq, _| {
seqs.push(seq);
Ok(())
})
.unwrap();
assert_eq!(seqs.last().copied(), Some(2));
}