use std::sync::Arc;
use holt::{BlobStore, Durability, Error, MemoryBlobStore, Tree, TreeBuilder, TreeConfig, DB};
use tempfile::tempdir;
#[test]
fn snapshot_isolates_root_local_writes() {
let tree = Tree::open(TreeConfig::memory()).unwrap();
for i in 0..5u32 {
tree.put(format!("k{i}").as_bytes(), format!("v{i}").as_bytes())
.unwrap();
}
let snap = tree.snapshot(b"").unwrap();
tree.put(b"k0", b"OVERWRITTEN").unwrap();
tree.put(b"k9", b"new").unwrap();
assert_eq!(snap.get(b"k0").unwrap().as_deref(), Some(&b"v0"[..]));
assert_eq!(snap.get(b"k9").unwrap(), None);
for i in 1..5u32 {
assert_eq!(
snap.get(format!("k{i}").as_bytes()).unwrap().as_deref(),
Some(format!("v{i}").as_bytes()),
);
}
assert_eq!(
tree.get(b"k0").unwrap().as_deref(),
Some(&b"OVERWRITTEN"[..]),
);
assert_eq!(tree.get(b"k9").unwrap().as_deref(), Some(&b"new"[..]));
}
#[test]
fn snapshot_reads_across_blob_boundaries() {
let store: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let tree = TreeBuilder::new("ignored")
.open_with_blob_store(store.clone())
.unwrap();
const N: u32 = 5000;
let value = vec![0xAB_u8; 200];
for i in 0..N {
tree.put(format!("k{i:08}").as_bytes(), &value).unwrap();
}
assert!(
store.list_blobs().unwrap().len() >= 2,
"test needs a multi-blob tree to be meaningful",
);
let snap = tree.snapshot(b"").unwrap();
for i in 0..N {
assert_eq!(
snap.get(format!("k{i:08}").as_bytes()).unwrap().as_deref(),
Some(&value[..]),
"snapshot lost key {i} across a blob-frame boundary",
);
}
}
#[test]
fn snapshot_scope_restricts_reads() {
let tree = Tree::open(TreeConfig::memory()).unwrap();
tree.put(b"users/alice", b"1").unwrap();
tree.put(b"users/bob", b"2").unwrap();
tree.put(b"orders/x", b"9").unwrap();
let snap = tree.snapshot(b"users/").unwrap();
assert_eq!(
snap.get(b"users/alice").unwrap().as_deref(),
Some(&b"1"[..])
);
assert_eq!(snap.scope(), b"users/");
let err = snap.get(b"orders/x").unwrap_err();
assert!(
matches!(err, Error::OutsideViewScope { .. }),
"out-of-scope read should be rejected, got {err:?}",
);
}
#[test]
fn snapshot_epochs_advance_and_retire() {
let tree = Tree::open(TreeConfig::memory()).unwrap();
tree.put(b"a", b"1").unwrap();
let s1 = tree.snapshot(b"").unwrap();
let e1 = s1.epoch();
let s2 = tree.snapshot(b"").unwrap();
let e2 = s2.epoch();
assert!(e2 > e1, "epochs must advance: {e1} then {e2}");
s1.retire();
drop(s2);
let s3 = tree.snapshot(b"").unwrap();
assert!(s3.epoch() > e2, "epoch must keep advancing past {e2}");
}
#[test]
fn snapshot_isolates_cross_blob_writes() {
let store: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let tree = TreeBuilder::new("ignored")
.open_with_blob_store(store.clone())
.unwrap();
const N: u32 = 5000;
let orig = vec![0xAB_u8; 200];
for i in 0..N {
tree.put(format!("k{i:08}").as_bytes(), &orig).unwrap();
}
assert!(
store.list_blobs().unwrap().len() >= 2,
"test needs a multi-blob tree",
);
let snap = tree.snapshot(b"").unwrap();
for i in (0..N).step_by(4) {
tree.put(format!("k{i:08}").as_bytes(), b"UPDATED").unwrap();
}
for i in N..N + 100 {
tree.put(format!("k{i:08}").as_bytes(), b"brand-new")
.unwrap();
}
for i in (2..N).step_by(7) {
tree.delete(format!("k{i:08}").as_bytes()).unwrap();
}
for i in 0..N {
assert_eq!(
snap.get(format!("k{i:08}").as_bytes()).unwrap().as_deref(),
Some(&orig[..]),
"snapshot key {i} changed under a live cross-blob write",
);
}
for i in N..N + 100 {
assert_eq!(snap.get(format!("k{i:08}").as_bytes()).unwrap(), None);
}
for i in 0..N {
let k = format!("k{i:08}");
let live = tree.get(k.as_bytes()).unwrap();
if i >= 2 && (i - 2) % 7 == 0 {
assert_eq!(live, None, "live key {i} should be deleted");
} else if i % 4 == 0 {
assert_eq!(
live.as_deref(),
Some(&b"UPDATED"[..]),
"live key {i} should be updated",
);
} else {
assert_eq!(live.as_deref(), Some(&orig[..]), "live key {i} unchanged");
}
}
for i in N..N + 100 {
assert_eq!(
tree.get(format!("k{i:08}").as_bytes()).unwrap().as_deref(),
Some(&b"brand-new"[..]),
);
}
}
#[test]
fn nested_cross_blob_snapshots_each_isolated() {
let store: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let tree = TreeBuilder::new("ignored")
.open_with_blob_store(store.clone())
.unwrap();
const N: u32 = 5000;
let v1 = vec![0x01_u8; 200];
let v2 = vec![0x02_u8; 200];
let v3 = vec![0x03_u8; 200];
for i in 0..N {
tree.put(format!("k{i:08}").as_bytes(), &v1).unwrap();
}
assert!(store.list_blobs().unwrap().len() >= 2);
let s1 = tree.snapshot(b"").unwrap();
for i in 0..N {
tree.put(format!("k{i:08}").as_bytes(), &v2).unwrap();
}
let s2 = tree.snapshot(b"").unwrap();
for i in 0..N {
tree.put(format!("k{i:08}").as_bytes(), &v3).unwrap();
}
for i in 0..N {
let k = format!("k{i:08}");
assert_eq!(
s1.get(k.as_bytes()).unwrap().as_deref(),
Some(&v1[..]),
"s1 key {i}",
);
assert_eq!(
s2.get(k.as_bytes()).unwrap().as_deref(),
Some(&v2[..]),
"s2 key {i}",
);
assert_eq!(
tree.get(k.as_bytes()).unwrap().as_deref(),
Some(&v3[..]),
"live key {i}",
);
}
}
#[test]
fn snapshot_stable_under_randomized_churn() {
use std::collections::HashMap;
let store: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let tree = TreeBuilder::new("ignored")
.open_with_blob_store(store)
.unwrap();
let mut lcg: u64 = 0x9E37_79B9_7F4A_7C15;
let mut next = move || {
lcg = lcg
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1_442_695_040_888_963_407);
(lcg >> 33) as u32
};
let mut live: HashMap<String, Vec<u8>> = HashMap::new();
for i in 0..1500u32 {
let k = format!("key{i:06}");
let v = vec![(i & 0xFF) as u8; 180];
tree.put(k.as_bytes(), &v).unwrap();
live.insert(k, v);
}
let snap = tree.snapshot(b"").unwrap();
let frozen = live.clone();
for _ in 0..6000 {
let k = format!("key{:06}", next() % 1800);
if next() % 4 == 0 {
tree.delete(k.as_bytes()).unwrap();
live.remove(&k);
} else {
let vlen = 1 + (next() % 200) as usize;
let v = vec![(next() & 0xFF) as u8; vlen];
tree.put(k.as_bytes(), &v).unwrap();
live.insert(k, v);
}
}
for (k, v) in &frozen {
assert_eq!(
snap.get(k.as_bytes()).unwrap().as_deref(),
Some(&v[..]),
"snapshot drifted at {k}",
);
}
for i in 1500..1800u32 {
let k = format!("key{i:06}");
assert_eq!(
snap.get(k.as_bytes()).unwrap(),
None,
"snapshot saw post-snapshot key {k}",
);
}
for (k, v) in &live {
assert_eq!(
tree.get(k.as_bytes()).unwrap().as_deref(),
Some(&v[..]),
"live tree drifted at {k}",
);
}
}
#[test]
fn retire_reclaims_forked_frames() {
let store: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let tree = TreeBuilder::new("ignored")
.open_with_blob_store(store.clone())
.unwrap();
const N: u32 = 5000;
let orig = vec![0xAB_u8; 200];
for i in 0..N {
tree.put(format!("k{i:08}").as_bytes(), &orig).unwrap();
}
tree.checkpoint().unwrap();
let baseline = store.list_blobs().unwrap().len();
assert!(baseline >= 2, "need a multi-blob tree");
{
let snap = tree.snapshot(b"").unwrap();
for i in (0..N).step_by(3) {
tree.put(format!("k{i:08}").as_bytes(), b"x").unwrap();
}
tree.checkpoint().unwrap();
let during = store.list_blobs().unwrap().len();
assert!(
during > baseline,
"snapshot + forks should add blobs: {during} vs {baseline}",
);
assert_eq!(snap.get(b"k00000000").unwrap().as_deref(), Some(&orig[..]));
}
tree.checkpoint().unwrap();
let after = store.list_blobs().unwrap().len();
assert_eq!(
after, baseline,
"retire must reclaim every snapshot frame: {after} vs {baseline}",
);
for i in 0..N {
let want: &[u8] = if i % 3 == 0 { b"x" } else { &orig };
assert_eq!(
tree.get(format!("k{i:08}").as_bytes()).unwrap().as_deref(),
Some(want),
"live key {i}",
);
}
}
#[test]
fn overlapping_snapshots_reclaim_after_last_retires() {
let store: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let tree = TreeBuilder::new("ignored")
.open_with_blob_store(store.clone())
.unwrap();
const N: u32 = 5000;
let v = vec![0xAB_u8; 200];
for i in 0..N {
tree.put(format!("k{i:08}").as_bytes(), &v).unwrap();
}
tree.checkpoint().unwrap();
let baseline = store.list_blobs().unwrap().len();
assert!(baseline >= 2);
let s1 = tree.snapshot(b"").unwrap();
for i in 0..N {
tree.put(format!("k{i:08}").as_bytes(), b"a").unwrap();
}
let s2 = tree.snapshot(b"").unwrap();
for i in 0..N {
tree.put(format!("k{i:08}").as_bytes(), b"b").unwrap();
}
tree.checkpoint().unwrap();
assert!(store.list_blobs().unwrap().len() > baseline);
drop(s1);
tree.checkpoint().unwrap();
drop(s2);
tree.checkpoint().unwrap();
let after = store.list_blobs().unwrap().len();
assert_eq!(
after, baseline,
"all snapshot frames reclaimed after the last retire: {after} vs {baseline}",
);
for i in 0..N {
assert_eq!(
tree.get(format!("k{i:08}").as_bytes()).unwrap().as_deref(),
Some(&b"b"[..]),
"live key {i}",
);
}
}
#[test]
fn snapshot_correct_after_reopen() {
let dir = tempdir().unwrap();
let cfg = || {
let mut c = TreeConfig::new(dir.path());
c.checkpoint.enabled = false;
c.durability = Durability::Wal { sync: true };
c
};
const N: u32 = 5000;
let v1 = vec![0x01_u8; 200];
let v2 = vec![0x02_u8; 200];
let v3 = vec![0x03_u8; 200];
{
let tree = Tree::open(cfg()).unwrap();
for i in 0..N {
tree.put(format!("k{i:06}").as_bytes(), &v1).unwrap();
}
{
let snap = tree.snapshot(b"").unwrap();
for i in 0..N {
tree.put(format!("k{i:06}").as_bytes(), &v2).unwrap();
}
assert_eq!(snap.get(b"k000000").unwrap().as_deref(), Some(&v1[..]));
} tree.checkpoint().unwrap();
}
let tree = Tree::open(cfg()).unwrap();
for i in 0..N {
assert_eq!(
tree.get(format!("k{i:06}").as_bytes()).unwrap().as_deref(),
Some(&v2[..]),
"reopened live key {i}",
);
}
let snap = tree.snapshot(b"").unwrap();
for i in 0..N {
tree.put(format!("k{i:06}").as_bytes(), &v3).unwrap();
}
for i in 0..N {
assert_eq!(
snap.get(format!("k{i:06}").as_bytes()).unwrap().as_deref(),
Some(&v2[..]),
"post-reopen snapshot key {i} was corrupted by a live write",
);
assert_eq!(
tree.get(format!("k{i:06}").as_bytes()).unwrap().as_deref(),
Some(&v3[..]),
"post-reopen live key {i}",
);
}
}
const CRASH_LEAK_DIR_ENV: &str = "HOLT_CRASH_LEAK_DIR";
const CRASH_LEAK_N: u32 = 5000;
fn crash_leak_value() -> Vec<u8> {
vec![0xAB_u8; 200]
}
fn crash_leak_cfg(dir: &std::path::Path) -> TreeConfig {
let mut c = TreeConfig::new(dir);
c.checkpoint.enabled = false;
c.durability = Durability::Wal { sync: true };
c
}
fn run_crash_session(child_test: &str, dir: &std::path::Path) {
let exe = std::env::current_exe().unwrap();
let status = std::process::Command::new(exe)
.args([child_test, "--exact", "--ignored", "--nocapture"])
.env(CRASH_LEAK_DIR_ENV, dir)
.status()
.unwrap();
assert!(status.success(), "crash-session child {child_test} failed");
}
#[test]
#[ignore = "child-process body for gc_reclaims_crash_leaked_snapshot_frames"]
fn crash_leak_tree_session() {
let Some(dir) = std::env::var_os(CRASH_LEAK_DIR_ENV) else {
return;
};
let dir = std::path::PathBuf::from(dir);
let v = crash_leak_value();
let tree = Tree::open(crash_leak_cfg(&dir)).unwrap();
for i in 0..CRASH_LEAK_N {
tree.put(format!("k{i:06}").as_bytes(), &v).unwrap();
}
tree.checkpoint().unwrap();
let snap = tree.snapshot(b"").unwrap();
for i in 0..CRASH_LEAK_N {
tree.put(format!("k{i:06}").as_bytes(), b"new").unwrap();
}
tree.checkpoint().unwrap();
std::mem::forget(snap);
}
#[test]
fn gc_reclaims_crash_leaked_snapshot_frames() {
let dir = tempdir().unwrap();
let cfg = || crash_leak_cfg(dir.path());
const N: u32 = CRASH_LEAK_N;
run_crash_session("crash_leak_tree_session", dir.path());
let tree = Tree::open(cfg()).unwrap();
for i in 0..N {
assert_eq!(
tree.get(format!("k{i:06}").as_bytes()).unwrap().as_deref(),
Some(&b"new"[..]),
"reopened live key {i}",
);
}
let freed = tree.gc().unwrap();
assert!(
freed > 0,
"gc should reclaim crash-leaked snapshot frames, freed {freed}",
);
assert_eq!(tree.gc().unwrap(), 0, "second gc must be a no-op");
for i in 0..N {
assert_eq!(
tree.get(format!("k{i:06}").as_bytes()).unwrap().as_deref(),
Some(&b"new"[..]),
"live key {i} after gc",
);
}
}
#[test]
#[ignore = "child-process body for db_gc_reclaims_leak_and_preserves_all_trees"]
fn crash_leak_db_session() {
let Some(dir) = std::env::var_os(CRASH_LEAK_DIR_ENV) else {
return;
};
let dir = std::path::PathBuf::from(dir);
let v = crash_leak_value();
let db = DB::open(crash_leak_cfg(&dir)).unwrap();
let t1 = db.create_tree("t1").unwrap();
let t2 = db.create_tree("t2").unwrap();
for i in 0..CRASH_LEAK_N {
t1.put(format!("k{i:06}").as_bytes(), &v).unwrap();
t2.put(format!("k{i:06}").as_bytes(), &v).unwrap();
}
db.checkpoint().unwrap();
let snap = t1.snapshot(b"").unwrap();
for i in 0..CRASH_LEAK_N {
t1.put(format!("k{i:06}").as_bytes(), b"new").unwrap();
}
db.checkpoint().unwrap();
std::mem::forget(snap); }
#[test]
fn db_gc_reclaims_leak_and_preserves_all_trees() {
let dir = tempdir().unwrap();
let cfg = || crash_leak_cfg(dir.path());
const N: u32 = CRASH_LEAK_N;
let v = crash_leak_value();
run_crash_session("crash_leak_db_session", dir.path());
let db = DB::open(cfg()).unwrap();
let freed = db.gc().unwrap();
assert!(
freed > 0,
"db gc should reclaim crash-leaked frames, freed {freed}",
);
assert_eq!(db.gc().unwrap(), 0, "second db gc must be a no-op");
let t1 = db.open_tree("t1").unwrap();
let t2 = db.open_tree("t2").unwrap();
for i in 0..N {
assert_eq!(
t1.get(format!("k{i:06}").as_bytes()).unwrap().as_deref(),
Some(&b"new"[..]),
"t1 key {i}",
);
assert_eq!(
t2.get(format!("k{i:06}").as_bytes()).unwrap().as_deref(),
Some(&v[..]),
"t2 key {i}",
);
}
}
#[test]
fn gc_rejects_db_trees() {
let dir = tempdir().unwrap();
let db = DB::open(TreeConfig::new(dir.path())).unwrap();
let tree = db.create_tree("t").unwrap();
assert!(
matches!(tree.gc(), Err(Error::GcRequiresStandaloneTree)),
"gc on a DB tree must be rejected",
);
}