use std::fs::OpenOptions;
use std::io::{Seek, SeekFrom, Write};
use datawal::{RecordLog, RecordLogReader, RecordType};
use tempfile::tempdir;
fn collect(reader: &RecordLogReader) -> Vec<(RecordType, Vec<u8>, Vec<u8>)> {
let iter = reader.scan_iter().expect("scan_iter");
let mut out = Vec::new();
for rec in iter {
let rec = rec.expect("record without error");
out.push((rec.record_type, rec.key, rec.payload));
}
out
}
fn seg_path(dir: &std::path::Path, id: u32) -> std::path::PathBuf {
dir.join(format!("{:08}.dwal", id))
}
#[test]
fn reader_opens_without_taking_writer_lock() {
let dir = tempdir().unwrap();
let mut writer = RecordLog::open(dir.path()).unwrap();
writer.append_record(RecordType::Put, b"k", b"v").unwrap();
writer.fsync().unwrap();
let reader = RecordLogReader::open(dir.path()).expect("reader opens");
assert!(!reader.segments().is_empty());
writer.append_record(RecordType::Put, b"k2", b"v2").unwrap();
writer.fsync().unwrap();
drop(reader);
drop(writer);
}
#[test]
fn reader_open_fails_on_missing_path() {
let dir = tempdir().unwrap();
let missing = dir.path().join("does-not-exist");
let err = RecordLogReader::open(&missing).unwrap_err();
let msg = format!("{:#}", err);
assert!(
msg.contains("does-not-exist") || msg.to_lowercase().contains("no such"),
"unexpected error: {msg}"
);
}
#[test]
fn reader_open_fails_on_non_directory_path() {
let dir = tempdir().unwrap();
let file = dir.path().join("file.txt");
std::fs::write(&file, b"not a dir").unwrap();
let err = RecordLogReader::open(&file).unwrap_err();
let msg = format!("{:#}", err);
assert!(
msg.to_lowercase().contains("not a directory") || msg.contains("file.txt"),
"unexpected error: {msg}"
);
}
#[test]
fn reader_open_succeeds_on_empty_dir() {
let dir = tempdir().unwrap();
let reader = RecordLogReader::open(dir.path()).expect("empty dir is fine");
assert_eq!(reader.segments().len(), 0);
let records = collect(&reader);
assert!(records.is_empty());
}
#[test]
fn reader_segments_match_snapshot_taken_at_open() {
let dir = tempdir().unwrap();
let mut writer = RecordLog::open(dir.path()).unwrap();
writer.append_record(RecordType::Put, b"a", b"1").unwrap();
writer.fsync().unwrap();
writer.rotate().unwrap();
writer.append_record(RecordType::Put, b"b", b"2").unwrap();
writer.fsync().unwrap();
writer.rotate().unwrap();
writer.append_record(RecordType::Put, b"c", b"3").unwrap();
writer.fsync().unwrap();
let reader = RecordLogReader::open(dir.path()).unwrap();
assert_eq!(reader.segments(), &[1u32, 2, 3]);
writer.rotate().unwrap();
writer.append_record(RecordType::Put, b"d", b"4").unwrap();
writer.fsync().unwrap();
assert_eq!(reader.segments(), &[1u32, 2, 3]);
let records = collect(&reader);
let keys: Vec<&[u8]> = records.iter().map(|(_, k, _)| k.as_slice()).collect();
assert_eq!(keys, vec![b"a".as_ref(), b"b", b"c"]);
}
#[test]
fn reader_observes_writer_appends_within_snapshot_segments() {
let dir = tempdir().unwrap();
let mut writer = RecordLog::open(dir.path()).unwrap();
writer.append_record(RecordType::Put, b"a", b"1").unwrap();
writer.fsync().unwrap();
let reader = RecordLogReader::open(dir.path()).unwrap();
assert_eq!(reader.segments(), &[1u32]);
writer.append_record(RecordType::Put, b"b", b"2").unwrap();
writer.fsync().unwrap();
let records = collect(&reader);
let keys: Vec<&[u8]> = records.iter().map(|(_, k, _)| k.as_slice()).collect();
assert_eq!(keys, vec![b"a".as_ref(), b"b"]);
}
#[test]
fn reader_scan_iter_can_be_called_multiple_times() {
let dir = tempdir().unwrap();
let mut writer = RecordLog::open(dir.path()).unwrap();
for i in 0..5u8 {
writer
.append_record(RecordType::Put, &[i], &[i, i, i])
.unwrap();
}
writer.fsync().unwrap();
let reader = RecordLogReader::open(dir.path()).unwrap();
let first = collect(&reader);
let second = collect(&reader);
let third = collect(&reader);
assert_eq!(first.len(), 5);
assert_eq!(first, second);
assert_eq!(second, third);
}
#[test]
fn reader_tolerates_unfsynced_tail_on_last_segment() {
let dir = tempdir().unwrap();
{
let mut writer = RecordLog::open(dir.path()).unwrap();
writer.append_record(RecordType::Put, b"k1", b"v1").unwrap();
writer.append_record(RecordType::Put, b"k2", b"v2").unwrap();
writer.fsync().unwrap();
}
let path = seg_path(dir.path(), 1);
let mut f = OpenOptions::new().append(true).open(&path).unwrap();
f.write_all(&[0xAA, 0xBB, 0xCC]).unwrap();
f.sync_all().unwrap();
drop(f);
let reader = RecordLogReader::open(dir.path()).unwrap();
let mut iter = reader.scan_iter().unwrap();
let mut count = 0;
for rec in iter.by_ref() {
rec.expect("valid prefix should not error");
count += 1;
}
assert_eq!(count, 2, "should yield exactly the valid prefix");
let report = iter.recovery_report();
assert!(
report.tail_truncated >= 1,
"expected tail_truncated >= 1, got {:?}",
report
);
assert!(
report.tail_bytes_discarded >= 3,
"expected at least 3 bytes discarded, got {:?}",
report
);
}
#[test]
fn reader_hard_errors_on_corrupt_sealed_segment() {
let dir = tempdir().unwrap();
{
let mut writer = RecordLog::open(dir.path()).unwrap();
writer.append_record(RecordType::Put, b"k1", b"v1").unwrap();
writer.fsync().unwrap();
writer.rotate().unwrap();
writer.append_record(RecordType::Put, b"k2", b"v2").unwrap();
writer.fsync().unwrap();
}
let path = seg_path(dir.path(), 1);
let mut f = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.unwrap();
let len = f.metadata().unwrap().len();
assert!(len > 4, "segment too small to corrupt safely");
let target = len - 2;
f.seek(SeekFrom::Start(target)).unwrap();
let mut buf = [0u8; 1];
use std::io::Read;
f.read_exact(&mut buf).unwrap();
let flipped = [buf[0] ^ 0xFF];
f.seek(SeekFrom::Start(target)).unwrap();
f.write_all(&flipped).unwrap();
f.sync_all().unwrap();
drop(f);
let reader = RecordLogReader::open(dir.path()).unwrap();
let iter = reader.scan_iter().unwrap();
let mut hard_error_seen = false;
for rec in iter {
if let Err(e) = rec {
let msg = format!("{:#}", e);
let lower = msg.to_lowercase();
assert!(
lower.contains("crc mismatch")
|| lower.contains("truncated record")
|| lower.contains("decode error"),
"expected sealed-segment failure, got: {msg}"
);
assert!(
lower.contains("non-tail") || lower.contains("segment 1"),
"expected message to mention non-tail / segment 1, got: {msg}"
);
hard_error_seen = true;
break;
}
}
assert!(
hard_error_seen,
"iterator should have surfaced the corruption"
);
}