use std::fs::OpenOptions;
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use datawal::DataWal;
use tempfile::tempdir;
fn seg_path(dir: &Path, id: u32) -> PathBuf {
dir.join(format!("{:08}.dwal", id))
}
#[test]
fn get_reads_payload_lazily_after_drop_and_reopen() {
let dir = tempdir().unwrap();
{
let mut kv = DataWal::open(dir.path()).expect("open");
kv.put(b"alpha", b"one").expect("put alpha");
kv.put(b"beta", b"two").expect("put beta");
kv.fsync().expect("fsync");
}
let mut kv = DataWal::open(dir.path()).expect("reopen");
assert_eq!(kv.get(b"alpha").unwrap().as_deref(), Some(&b"one"[..]));
assert_eq!(kv.get(b"beta").unwrap().as_deref(), Some(&b"two"[..]));
assert!(kv.get(b"missing").unwrap().is_none());
}
#[test]
fn get_revalidates_crc_and_errors_on_corruption() {
let dir = tempdir().unwrap();
let mut kv = DataWal::open(dir.path()).expect("open");
kv.put(b"k", b"some-payload-that-will-be-corrupted-on-disk")
.expect("put");
kv.fsync().expect("fsync");
assert!(kv.ref_of(b"k").is_some(), "record must be in keydir");
let seg = seg_path(dir.path(), 1);
let len = std::fs::metadata(&seg).unwrap().len();
let target_offset = len - 5;
let mut f = OpenOptions::new()
.read(true)
.write(true)
.open(&seg)
.unwrap();
f.seek(SeekFrom::Start(target_offset)).unwrap();
let mut byte = [0u8; 1];
f.read_exact(&mut byte).unwrap();
byte[0] ^= 0xFF;
f.seek(SeekFrom::Start(target_offset)).unwrap();
f.write_all(&byte).unwrap();
f.sync_all().unwrap();
drop(f);
let err = kv
.get(b"k")
.expect_err("get should fail on corrupt payload");
let msg = format!("{err:#}").to_lowercase();
assert!(
msg.contains("crc") || msg.contains("decode"),
"expected CRC/decode error, got: {msg}"
);
}
#[test]
fn ref_of_tracks_live_keys_and_returns_none_after_delete() {
let dir = tempdir().unwrap();
let mut kv = DataWal::open(dir.path()).expect("open");
assert!(kv.ref_of(b"x").is_none());
kv.put(b"x", b"v1").expect("put");
kv.fsync().expect("fsync");
let r1 = kv.ref_of(b"x").expect("x should be live");
assert_eq!(r1.segment, 1);
assert!(r1.len > 0);
kv.delete(b"x").expect("delete");
kv.fsync().expect("fsync");
assert!(
kv.ref_of(b"x").is_none(),
"ref_of should return None after delete tombstone"
);
assert_eq!(
kv.get(b"x").unwrap(),
None,
"get should agree with ref_of after delete"
);
}
#[test]
fn ref_of_advances_to_latest_overwrite() {
let dir = tempdir().unwrap();
let mut kv = DataWal::open(dir.path()).expect("open");
kv.put(b"k", b"first").expect("put first");
let r1 = kv.ref_of(b"k").expect("first ref");
kv.put(b"k", b"second-longer").expect("put second");
let r2 = kv.ref_of(b"k").expect("second ref");
assert_eq!(r1.segment, r2.segment, "same segment for both writes");
assert!(
r2.offset > r1.offset,
"second ref must be past the first: r1={r1:?} r2={r2:?}"
);
kv.fsync().expect("fsync");
assert_eq!(
kv.get(b"k").unwrap().as_deref(),
Some(&b"second-longer"[..])
);
}
#[test]
fn get_works_for_records_in_sealed_segment_after_rotation() {
let dir = tempdir().unwrap();
let mut kv = DataWal::open(dir.path()).expect("open");
kv.put(b"sealed-key", b"sealed-value")
.expect("put pre-rotate");
kv.fsync().expect("fsync");
let dst = tempdir().unwrap();
let _ = kv.compact_to(dst.path()).expect("compact_to");
let mut kv2 = DataWal::open(dst.path()).expect("open compacted");
kv2.put(b"new-key", b"new-value").expect("put post-compact");
kv2.fsync().expect("fsync");
assert_eq!(
kv2.get(b"sealed-key").unwrap().as_deref(),
Some(&b"sealed-value"[..])
);
assert_eq!(
kv2.get(b"new-key").unwrap().as_deref(),
Some(&b"new-value"[..])
);
}
#[test]
fn fd_pool_reuses_fd_across_repeated_gets() {
let dir = tempdir().unwrap();
let mut kv = DataWal::open(dir.path()).expect("open");
for i in 0u32..100 {
let k = format!("k{i:03}").into_bytes();
let v = format!("v{i:03}").into_bytes();
kv.put(&k, &v).expect("put");
}
kv.fsync().expect("fsync");
#[cfg(target_os = "linux")]
let before = std::fs::read_dir("/proc/self/fd").map(|d| d.count()).ok();
for _ in 0..5 {
for i in 0u32..100 {
let k = format!("k{i:03}").into_bytes();
let v = format!("v{i:03}").into_bytes();
assert_eq!(kv.get(&k).unwrap().as_deref(), Some(&v[..]));
}
}
#[cfg(target_os = "linux")]
{
let after = std::fs::read_dir("/proc/self/fd").map(|d| d.count()).ok();
if let (Some(b), Some(a)) = (before, after) {
assert!(
a <= b + 32,
"fd count grew from {b} to {a} -- fd pool not reusing"
);
}
}
}
#[test]
fn items_returns_all_live_pairs_and_revalidates_crc() {
let dir = tempdir().unwrap();
let mut kv = DataWal::open(dir.path()).expect("open");
kv.put(b"a", b"1").expect("put a");
kv.put(b"b", b"2").expect("put b");
kv.put(b"c", b"3").expect("put c");
kv.delete(b"b").expect("delete b");
kv.fsync().expect("fsync");
let mut items = kv.items().expect("items");
items.sort();
assert_eq!(
items,
vec![
(b"a".to_vec(), b"1".to_vec()),
(b"c".to_vec(), b"3".to_vec())
]
);
}
#[test]
fn compact_to_preserves_live_state_via_offset_keydir() {
let dir = tempdir().unwrap();
let mut kv = DataWal::open(dir.path()).expect("open");
for i in 0u32..50 {
let k = format!("k{i:02}").into_bytes();
let v = format!("v{i:02}-payload").into_bytes();
kv.put(&k, &v).expect("put");
}
for i in 0u32..25 {
let k = format!("k{i:02}").into_bytes();
let v = format!("OVERWRITE-{i:02}").into_bytes();
kv.put(&k, &v).expect("overwrite");
}
for i in 40u32..45 {
let k = format!("k{i:02}").into_bytes();
kv.delete(&k).expect("delete");
}
kv.fsync().expect("fsync");
let dst = tempdir().unwrap();
let stats = kv.compact_to(dst.path()).expect("compact_to");
assert_eq!(stats.live_keys, 50 - 5);
let mut compacted = DataWal::open(dst.path()).expect("open compacted");
for i in 0u32..25 {
let k = format!("k{i:02}").into_bytes();
let v = format!("OVERWRITE-{i:02}").into_bytes();
assert_eq!(compacted.get(&k).unwrap().as_deref(), Some(&v[..]));
}
for i in 25u32..40 {
let k = format!("k{i:02}").into_bytes();
let v = format!("v{i:02}-payload").into_bytes();
assert_eq!(compacted.get(&k).unwrap().as_deref(), Some(&v[..]));
}
for i in 40u32..45 {
let k = format!("k{i:02}").into_bytes();
assert!(compacted.get(&k).unwrap().is_none());
}
for i in 45u32..50 {
let k = format!("k{i:02}").into_bytes();
let v = format!("v{i:02}-payload").into_bytes();
assert_eq!(compacted.get(&k).unwrap().as_deref(), Some(&v[..]));
}
}