use std::fs::{self, File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use tempfile::TempDir;
use walogs::{Lsn, MAX_ENTRY_SIZE, TailState, Wal, WalConfig, WalError};
fn collect(wal: &Wal) -> Vec<(Lsn, Vec<u8>)> {
wal.iter().map(|r| r.expect("io error")).collect()
}
#[test]
fn w1_open_empty_dir_creates_file() {
let dir = TempDir::new().unwrap();
let wal = Wal::open(dir.path()).unwrap();
assert!(dir.path().join("wal-000001.log").exists());
assert_eq!(wal.next_lsn(), Lsn(1));
assert_eq!(wal.tail_state(), TailState::Clean);
assert_eq!(collect(&wal).len(), 0);
}
#[test]
fn w2_append_then_iter_same_handle() {
let dir = TempDir::new().unwrap();
let mut wal = Wal::open(dir.path()).unwrap();
assert_eq!(wal.append(b"one").unwrap(), Lsn(1));
assert_eq!(wal.append(b"two").unwrap(), Lsn(2));
assert_eq!(wal.append(b"three").unwrap(), Lsn(3));
let entries = collect(&wal);
assert_eq!(entries.len(), 3);
assert_eq!(entries[0], (Lsn(1), b"one".to_vec()));
assert_eq!(entries[1], (Lsn(2), b"two".to_vec()));
assert_eq!(entries[2], (Lsn(3), b"three".to_vec()));
}
#[test]
fn w3_append_drop_reopen_iter() {
let dir = TempDir::new().unwrap();
{
let mut wal = Wal::open(dir.path()).unwrap();
wal.append(b"one").unwrap();
wal.append(b"two").unwrap();
wal.append(b"three").unwrap();
}
let wal = Wal::open(dir.path()).unwrap();
assert_eq!(wal.next_lsn(), Lsn(4));
assert_eq!(wal.tail_state(), TailState::Clean);
let entries = collect(&wal);
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].0, Lsn(1));
assert_eq!(entries[2].1, b"three".to_vec());
}
#[test]
fn w4_corrupt_last_byte_truncates_on_open() {
let dir = TempDir::new().unwrap();
let seg_path = dir.path().join("wal-000001.log");
{
let mut wal = Wal::open(dir.path()).unwrap();
wal.append(b"aaa").unwrap();
wal.append(b"bbb").unwrap();
wal.append(b"ccc").unwrap();
}
let size_with_3 = fs::metadata(&seg_path).unwrap().len();
assert_eq!(size_with_3, 57);
let third_frame_start = 38u64;
{
let mut f = OpenOptions::new()
.read(true)
.write(true)
.open(&seg_path)
.unwrap();
f.seek(SeekFrom::Start(size_with_3 - 1)).unwrap();
let mut byte = [0u8; 1];
f.read_exact(&mut byte).unwrap();
f.seek(SeekFrom::Start(size_with_3 - 1)).unwrap();
f.write_all(&[byte[0] ^ 0xFF]).unwrap();
f.sync_all().unwrap();
}
let wal = Wal::open(dir.path()).unwrap();
assert_eq!(wal.tail_state(), TailState::TruncatedAt(third_frame_start));
assert_eq!(
fs::metadata(&seg_path).unwrap().len(),
third_frame_start,
"file must be physically truncated (I2)"
);
let entries = collect(&wal);
assert_eq!(entries.len(), 2);
assert_eq!(entries[0], (Lsn(1), b"aaa".to_vec()));
assert_eq!(entries[1], (Lsn(2), b"bbb".to_vec()));
assert_eq!(wal.next_lsn(), Lsn(3));
}
#[test]
fn w5_append_after_truncation_no_buried_garbage() {
let dir = TempDir::new().unwrap();
let seg_path = dir.path().join("wal-000001.log");
{
let mut wal = Wal::open(dir.path()).unwrap();
wal.append(b"aaa").unwrap();
wal.append(b"bbb").unwrap();
wal.append(b"ccc").unwrap();
}
{
let size = fs::metadata(&seg_path).unwrap().len();
let mut f = OpenOptions::new().write(true).open(&seg_path).unwrap();
f.seek(SeekFrom::Start(size - 1)).unwrap();
f.write_all(&[0xFF]).unwrap();
f.sync_all().unwrap();
}
{
let mut wal = Wal::open(dir.path()).unwrap();
assert_eq!(wal.next_lsn(), Lsn(3));
wal.append(b"ddd").unwrap();
wal.append(b"eee").unwrap();
wal.append(b"fff").unwrap();
}
let wal = Wal::open(dir.path()).unwrap();
assert_eq!(wal.tail_state(), TailState::Clean);
let entries = collect(&wal);
assert_eq!(entries.len(), 5);
assert_eq!(
entries.iter().map(|(l, _)| l.0).collect::<Vec<_>>(),
vec![1, 2, 3, 4, 5]
);
assert_eq!(entries[2].1, b"ddd".to_vec());
assert_eq!(entries[4].1, b"fff".to_vec());
assert_eq!(wal.next_lsn(), Lsn(6));
}
#[test]
fn w6_append_oversize_returns_error() {
let dir = TempDir::new().unwrap();
let seg_path = dir.path().join("wal-000001.log");
let mut wal = Wal::open(dir.path()).unwrap();
let size_before = fs::metadata(&seg_path).unwrap().len();
let next_before = wal.next_lsn();
let big = vec![0u8; MAX_ENTRY_SIZE + 1];
match wal.append(&big) {
Err(WalError::EntryTooLarge { size, max }) => {
assert_eq!(size, MAX_ENTRY_SIZE + 1);
assert_eq!(max, MAX_ENTRY_SIZE);
}
other => panic!("expected EntryTooLarge, got {:?}", other),
}
assert_eq!(wal.next_lsn(), next_before, "next_lsn must not advance");
assert_eq!(
fs::metadata(&seg_path).unwrap().len(),
size_before,
"file size must be unchanged"
);
}
#[test]
fn w7_append_at_max_size_succeeds() {
let dir = TempDir::new().unwrap();
let payload = vec![0xCCu8; MAX_ENTRY_SIZE];
let config = WalConfig {
max_segment_size: None,
};
{
let mut wal = Wal::open_with_config(dir.path(), config.clone()).unwrap();
let lsn = wal.append(&payload).unwrap();
assert_eq!(lsn, Lsn(1));
}
let wal = Wal::open_with_config(dir.path(), config).unwrap();
let entries = collect(&wal);
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].0, Lsn(1));
assert_eq!(entries[0].1.len(), MAX_ENTRY_SIZE);
assert_eq!(entries[0].1, payload);
}
#[test]
fn w8_multiple_open_close_cycles() {
let dir = TempDir::new().unwrap();
for i in 1u64..=10 {
let mut wal = Wal::open(dir.path()).unwrap();
let payload = format!("e-{}", i);
let lsn = wal.append(payload.as_bytes()).unwrap();
assert_eq!(lsn, Lsn(i));
}
let wal = Wal::open(dir.path()).unwrap();
let entries = collect(&wal);
assert_eq!(entries.len(), 10);
for (i, (lsn, data)) in entries.iter().enumerate() {
let expected_lsn = (i + 1) as u64;
assert_eq!(lsn.0, expected_lsn);
assert_eq!(data, format!("e-{}", expected_lsn).as_bytes());
}
assert_eq!(wal.next_lsn(), Lsn(11));
}
#[test]
fn w9_iter_on_empty_wal() {
let dir = TempDir::new().unwrap();
let wal = Wal::open(dir.path()).unwrap();
assert_eq!(wal.tail_state(), TailState::Clean);
assert_eq!(collect(&wal).len(), 0);
assert_eq!(wal.next_lsn(), Lsn(1));
}
#[test]
fn w10_corrupt_first_header_zero_entries() {
let dir = TempDir::new().unwrap();
let seg_path = dir.path().join("wal-000001.log");
{
let mut f = File::create(&seg_path).unwrap();
let mut bogus = vec![];
bogus.extend_from_slice(&u32::MAX.to_le_bytes()); bogus.extend_from_slice(&0u32.to_le_bytes()); bogus.extend_from_slice(&0u64.to_le_bytes()); f.write_all(&bogus).unwrap();
f.sync_all().unwrap();
}
let wal = Wal::open(dir.path()).unwrap();
assert_eq!(wal.tail_state(), TailState::TruncatedAt(0));
assert_eq!(
fs::metadata(&seg_path).unwrap().len(),
0,
"file must be truncated to 0 bytes (I2)"
);
assert_eq!(collect(&wal).len(), 0);
assert_eq!(wal.next_lsn(), Lsn(1));
}
#[test]
fn w11_rotate_then_reopen_iter() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
max_segment_size: None,
};
{
let mut wal = Wal::open_with_config(dir.path(), config.clone()).unwrap();
wal.append(b"a1").unwrap();
wal.append(b"a2").unwrap();
wal.append(b"a3").unwrap();
wal.rotate().unwrap();
wal.append(b"b1").unwrap();
wal.append(b"b2").unwrap();
wal.append(b"b3").unwrap();
assert_eq!(wal.segment_count(), 2);
}
assert!(dir.path().join("wal-000001.log").exists());
assert!(dir.path().join("wal-000002.log").exists());
let wal = Wal::open_with_config(dir.path(), config).unwrap();
assert_eq!(wal.tail_state(), TailState::Clean);
assert_eq!(wal.next_lsn(), Lsn(7));
assert_eq!(wal.segment_count(), 2);
let entries = collect(&wal);
assert_eq!(entries.len(), 6);
assert_eq!(
entries.iter().map(|(l, _)| l.0).collect::<Vec<_>>(),
vec![1, 2, 3, 4, 5, 6]
);
assert_eq!(entries[0].1, b"a1".to_vec());
assert_eq!(entries[3].1, b"b1".to_vec());
}
#[test]
fn w12_checkpoint_deletes_completed_segments() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
max_segment_size: None,
};
let mut wal = Wal::open_with_config(dir.path(), config).unwrap();
wal.append(b"a1").unwrap(); wal.append(b"a2").unwrap(); wal.append(b"a3").unwrap(); wal.rotate().unwrap();
wal.append(b"b1").unwrap(); wal.append(b"b2").unwrap(); wal.append(b"b3").unwrap(); wal.rotate().unwrap();
wal.append(b"c1").unwrap(); assert_eq!(wal.segment_count(), 3);
let deleted = wal.checkpoint(Lsn(3)).unwrap();
assert_eq!(deleted, 1);
assert!(!dir.path().join("wal-000001.log").exists());
assert!(dir.path().join("wal-000002.log").exists());
assert!(dir.path().join("wal-000003.log").exists());
assert_eq!(wal.segment_count(), 2);
let deleted = wal.checkpoint(Lsn(6)).unwrap();
assert_eq!(deleted, 1);
assert!(!dir.path().join("wal-000002.log").exists());
assert!(dir.path().join("wal-000003.log").exists());
assert_eq!(wal.segment_count(), 1);
let entries = collect(&wal);
assert_eq!(entries.len(), 1);
assert_eq!(entries[0], (Lsn(7), b"c1".to_vec()));
}
#[test]
fn w13_auto_rotation() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
max_segment_size: Some(60),
};
let mut wal = Wal::open_with_config(dir.path(), config.clone()).unwrap();
for i in 1..=6 {
let payload = format!("entry-{:04}", i);
wal.append(payload.as_bytes()).unwrap();
}
assert_eq!(wal.segment_count(), 3);
assert!(dir.path().join("wal-000001.log").exists());
assert!(dir.path().join("wal-000002.log").exists());
assert!(dir.path().join("wal-000003.log").exists());
drop(wal);
let wal = Wal::open_with_config(dir.path(), config).unwrap();
let entries = collect(&wal);
assert_eq!(entries.len(), 6);
for (i, (lsn, data)) in entries.iter().enumerate() {
assert_eq!(lsn.0, (i + 1) as u64);
assert_eq!(data, format!("entry-{:04}", i + 1).as_bytes());
}
}
#[test]
fn w14_corrupt_non_last_segment_fails() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
max_segment_size: None,
};
{
let mut wal = Wal::open_with_config(dir.path(), config.clone()).unwrap();
wal.append(b"aaa").unwrap();
wal.append(b"bbb").unwrap();
wal.rotate().unwrap();
wal.append(b"ccc").unwrap();
}
let seg1 = dir.path().join("wal-000001.log");
{
let mut f = OpenOptions::new()
.read(true)
.write(true)
.open(&seg1)
.unwrap();
f.seek(SeekFrom::Start(0)).unwrap();
let mut byte = [0u8; 1];
f.read_exact(&mut byte).unwrap();
f.seek(SeekFrom::Start(0)).unwrap();
f.write_all(&[byte[0] ^ 0xFF]).unwrap();
f.sync_all().unwrap();
}
let result = Wal::open_with_config(dir.path(), config);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
format!("{}", err).contains("corrupt"),
"expected corruption error, got: {}",
err
);
}
#[test]
fn w15_v1_migration() {
let dir = TempDir::new().unwrap();
{
let mut wal = Wal::open(dir.path()).unwrap();
wal.append(b"hello").unwrap();
wal.append(b"world").unwrap();
}
let v2_path = dir.path().join("wal-000001.log");
let v1_path = dir.path().join("wal.log");
fs::rename(&v2_path, &v1_path).unwrap();
assert!(v1_path.exists());
assert!(!v2_path.exists());
let wal = Wal::open(dir.path()).unwrap();
assert!(!v1_path.exists());
assert!(v2_path.exists());
assert_eq!(wal.next_lsn(), Lsn(3));
let entries = collect(&wal);
assert_eq!(entries.len(), 2);
assert_eq!(entries[0], (Lsn(1), b"hello".to_vec()));
assert_eq!(entries[1], (Lsn(2), b"world".to_vec()));
}
#[test]
fn w16_mixed_v1_v2_error() {
let dir = TempDir::new().unwrap();
File::create(dir.path().join("wal.log")).unwrap();
File::create(dir.path().join("wal-000001.log")).unwrap();
let result = Wal::open(dir.path());
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
format!("{}", err).contains("wal.log"),
"expected mixed-version error, got: {}",
err
);
}
#[test]
fn w17_checkpoint_then_reopen() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
max_segment_size: None,
};
{
let mut wal = Wal::open_with_config(dir.path(), config.clone()).unwrap();
wal.append(b"a").unwrap(); wal.append(b"b").unwrap(); wal.rotate().unwrap();
wal.append(b"c").unwrap(); wal.append(b"d").unwrap(); wal.checkpoint(Lsn(2)).unwrap();
}
let wal = Wal::open_with_config(dir.path(), config).unwrap();
assert_eq!(wal.next_lsn(), Lsn(5));
assert_eq!(wal.tail_state(), TailState::Clean);
let entries = collect(&wal);
assert_eq!(entries.len(), 2);
assert_eq!(entries[0], (Lsn(3), b"c".to_vec()));
assert_eq!(entries[1], (Lsn(4), b"d".to_vec()));
}
#[test]
fn w18_rotate_empty_segment_noop() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
max_segment_size: None,
};
let mut wal = Wal::open_with_config(dir.path(), config).unwrap();
wal.rotate().unwrap(); assert_eq!(wal.segment_count(), 1);
assert!(!dir.path().join("wal-000002.log").exists());
wal.append(b"test").unwrap();
assert_eq!(collect(&wal).len(), 1);
}
#[test]
fn w19_checkpoint_never_deletes_active() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
max_segment_size: None,
};
let mut wal = Wal::open_with_config(dir.path(), config).unwrap();
wal.append(b"only").unwrap();
let deleted = wal.checkpoint(Lsn(100)).unwrap();
assert_eq!(deleted, 0);
assert!(dir.path().join("wal-000001.log").exists());
assert_eq!(collect(&wal).len(), 1);
}
#[test]
fn w20_multiple_rotations_and_checkpoints() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
max_segment_size: None,
};
let mut wal = Wal::open_with_config(dir.path(), config.clone()).unwrap();
for seg in 0..5 {
if seg > 0 {
wal.rotate().unwrap();
}
let a = format!("s{}-a", seg + 1);
let b = format!("s{}-b", seg + 1);
wal.append(a.as_bytes()).unwrap();
wal.append(b.as_bytes()).unwrap();
}
assert_eq!(wal.segment_count(), 5);
let deleted = wal.checkpoint(Lsn(6)).unwrap();
assert_eq!(deleted, 3);
assert_eq!(wal.segment_count(), 2);
drop(wal);
let wal = Wal::open_with_config(dir.path(), config).unwrap();
let entries = collect(&wal);
assert_eq!(entries.len(), 4);
assert_eq!(entries[0].0, Lsn(7));
assert_eq!(entries[3].0, Lsn(10));
}
#[test]
fn w21_lsn_continuity_across_rotations() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
max_segment_size: None,
};
let mut wal = Wal::open_with_config(dir.path(), config.clone()).unwrap();
wal.append(b"x").unwrap();
wal.rotate().unwrap();
wal.append(b"y").unwrap();
wal.rotate().unwrap();
wal.append(b"z").unwrap();
drop(wal);
let wal = Wal::open_with_config(dir.path(), config).unwrap();
let entries = collect(&wal);
assert_eq!(entries.len(), 3);
assert_eq!(entries[0], (Lsn(1), b"x".to_vec()));
assert_eq!(entries[1], (Lsn(2), b"y".to_vec()));
assert_eq!(entries[2], (Lsn(3), b"z".to_vec()));
}
#[test]
fn w22_corrupt_last_segment_tail() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
max_segment_size: None,
};
{
let mut wal = Wal::open_with_config(dir.path(), config.clone()).unwrap();
wal.append(b"a1").unwrap(); wal.append(b"a2").unwrap(); wal.rotate().unwrap();
wal.append(b"b1").unwrap(); wal.append(b"b2").unwrap(); }
let seg2 = dir.path().join("wal-000002.log");
{
let size = fs::metadata(&seg2).unwrap().len();
assert_eq!(size, 36); let mut f = OpenOptions::new().write(true).open(&seg2).unwrap();
f.seek(SeekFrom::Start(size - 1)).unwrap();
f.write_all(&[0xFF]).unwrap();
f.sync_all().unwrap();
}
let wal = Wal::open_with_config(dir.path(), config).unwrap();
let entries = collect(&wal);
assert_eq!(entries.len(), 3);
assert_eq!(entries[0], (Lsn(1), b"a1".to_vec()));
assert_eq!(entries[1], (Lsn(2), b"a2".to_vec()));
assert_eq!(entries[2], (Lsn(3), b"b1".to_vec()));
assert_eq!(wal.tail_state(), TailState::TruncatedAt(18));
}
#[test]
fn w23_segment_sequence_gap_detected() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
max_segment_size: None,
};
{
let mut wal = Wal::open_with_config(dir.path(), config.clone()).unwrap();
wal.append(b"a").unwrap(); wal.rotate().unwrap();
wal.append(b"b").unwrap(); wal.rotate().unwrap();
wal.append(b"c").unwrap(); }
fs::remove_file(dir.path().join("wal-000002.log")).unwrap();
let err = Wal::open_with_config(dir.path(), config).unwrap_err();
match &err {
WalError::Corrupt { reason } => {
assert!(
reason.contains("gap"),
"expected 'gap' in error message, got: {}",
reason
);
}
other => panic!("expected WalError::Corrupt, got: {:?}", other),
}
}
#[test]
fn w24_open_on_file_not_dir() {
let tmp = TempDir::new().unwrap();
let file_path = tmp.path().join("not-a-dir");
File::create(&file_path).unwrap();
let err = Wal::open(&file_path).unwrap_err();
assert!(
matches!(err, WalError::Io(_)),
"expected WalError::Io, got: {:?}",
err
);
}
#[test]
fn w25_checkpoint_lsn_zero_is_noop() {
let dir = TempDir::new().unwrap();
let config = WalConfig {
max_segment_size: None,
};
let mut wal = Wal::open_with_config(dir.path(), config).unwrap();
wal.append(b"a").unwrap(); wal.rotate().unwrap();
wal.append(b"b").unwrap();
let deleted = wal.checkpoint(Lsn(0)).unwrap();
assert_eq!(deleted, 0, "checkpoint(Lsn(0)) must delete nothing");
assert!(dir.path().join("wal-000001.log").exists());
assert!(dir.path().join("wal-000002.log").exists());
}
#[test]
fn w26_readonly_active_segment_truncation_error() {
use std::os::unix::fs::PermissionsExt;
let dir = TempDir::new().unwrap();
let seg_path = dir.path().join("wal-000001.log");
{
let mut wal = Wal::open(dir.path()).unwrap();
wal.append(b"hello").unwrap();
}
{
let meta = fs::metadata(&seg_path).unwrap();
let size = meta.len();
let mut f = OpenOptions::new().write(true).open(&seg_path).unwrap();
f.seek(SeekFrom::Start(size - 1)).unwrap();
f.write_all(&[0xFF]).unwrap();
f.sync_all().unwrap();
}
fs::set_permissions(&seg_path, std::fs::Permissions::from_mode(0o444)).unwrap();
let result = Wal::open(dir.path());
fs::set_permissions(&seg_path, std::fs::Permissions::from_mode(0o644)).unwrap();
assert!(
matches!(result, Err(WalError::Io(_))),
"expected WalError::Io when truncating read-only segment, got: {:?}",
result
);
}