use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tempfile::tempdir;
use holt::{AlignedBlobBuf, Backend, CheckpointConfig, MemoryBackend, Tree, TreeConfig};
struct FailpointBackend {
inner: Arc<dyn Backend>,
delete_calls: AtomicUsize,
flush_calls: AtomicUsize,
write_calls: AtomicUsize,
fail_delete_at: AtomicUsize, fail_flush_at: AtomicUsize,
fail_write_at: AtomicUsize,
}
impl FailpointBackend {
fn new(inner: Arc<dyn Backend>) -> Self {
Self {
inner,
delete_calls: AtomicUsize::new(0),
flush_calls: AtomicUsize::new(0),
write_calls: AtomicUsize::new(0),
fail_delete_at: AtomicUsize::new(usize::MAX),
fail_flush_at: AtomicUsize::new(usize::MAX),
fail_write_at: AtomicUsize::new(usize::MAX),
}
}
fn arm_delete(&self, nth: usize) {
self.fail_delete_at.store(nth, Ordering::SeqCst);
}
fn arm_flush(&self, nth: usize) {
self.fail_flush_at.store(nth, Ordering::SeqCst);
}
fn arm_write(&self, nth: usize) {
self.fail_write_at.store(nth, Ordering::SeqCst);
}
fn delete_count(&self) -> usize {
self.delete_calls.load(Ordering::SeqCst)
}
#[allow(dead_code)]
fn flush_count(&self) -> usize {
self.flush_calls.load(Ordering::SeqCst)
}
}
fn failpoint_err(msg: &'static str) -> holt::Error {
holt::Error::BackendIo(io::Error::other(msg))
}
impl Backend for FailpointBackend {
fn read_blob(&self, guid: holt::BlobGuid, dst: &mut AlignedBlobBuf) -> holt::Result<()> {
self.inner.read_blob(guid, dst)
}
fn write_blob(
&self,
guid: holt::BlobGuid,
src: &AlignedBlobBuf,
) -> holt::Result<()> {
let n = self.write_calls.fetch_add(1, Ordering::SeqCst) + 1;
let armed = self.fail_write_at.load(Ordering::SeqCst);
if n == armed {
self.fail_write_at.store(usize::MAX, Ordering::SeqCst);
return Err(failpoint_err("failpoint: write_blob"));
}
self.inner.write_blob(guid, src)
}
fn delete_blob(&self, guid: holt::BlobGuid) -> holt::Result<()> {
let n = self.delete_calls.fetch_add(1, Ordering::SeqCst) + 1;
let armed = self.fail_delete_at.load(Ordering::SeqCst);
if n == armed {
self.fail_delete_at.store(usize::MAX, Ordering::SeqCst);
return Err(failpoint_err("failpoint: delete_blob"));
}
self.inner.delete_blob(guid)
}
fn list_blobs(&self) -> holt::Result<Vec<holt::BlobGuid>> {
self.inner.list_blobs()
}
fn flush(&self) -> holt::Result<()> {
let n = self.flush_calls.fetch_add(1, Ordering::SeqCst) + 1;
let armed = self.fail_flush_at.load(Ordering::SeqCst);
if n == armed {
self.fail_flush_at.store(usize::MAX, Ordering::SeqCst);
return Err(failpoint_err("failpoint: flush"));
}
self.inner.flush()
}
fn has_blob(&self, guid: holt::BlobGuid) -> holt::Result<bool> {
self.inner.has_blob(guid)
}
}
fn setup_with_pending_delete() -> (Arc<dyn Backend>, Arc<FailpointBackend>, Tree) {
let inner: Arc<dyn Backend> = Arc::new(MemoryBackend::new());
let fp = Arc::new(FailpointBackend::new(Arc::clone(&inner)));
let fp_dyn: Arc<dyn Backend> = fp.clone();
let mut cfg = TreeConfig::memory();
cfg.memory_flush_on_write = false;
let tree = Tree::open_with_backend(cfg, fp_dyn).unwrap();
let payload = vec![b'z'; 1024];
for i in 0..1000u32 {
let k = format!("k{i:05}");
tree.put(k.as_bytes(), &payload).unwrap();
}
for i in 0..950u32 {
let k = format!("k{i:05}");
let _ = tree.delete(k.as_bytes()).unwrap();
}
tree.compact().unwrap();
(inner, fp, tree)
}
#[test]
fn pending_delete_execute_failure_is_retried_next_round() {
let (inner, fp, tree) = setup_with_pending_delete();
let stats_before = tree.stats().unwrap();
assert!(
stats_before.bm_pending_delete_count > 0,
"setup must produce at least one pending delete (got {})",
stats_before.bm_pending_delete_count,
);
fp.arm_delete(fp.delete_count() + 1);
let result1 = tree.checkpoint();
assert!(
result1.is_err(),
"checkpoint must surface the first delete_blob failure",
);
assert!(
tree.stats().unwrap().bm_pending_delete_count > 0,
"failed delete entry must stay queued for retry",
);
tree.checkpoint().unwrap();
assert_eq!(tree.stats().unwrap().bm_pending_delete_count, 0);
let backend_blobs = inner.list_blobs().unwrap();
let stats = tree.stats().unwrap();
assert_eq!(
backend_blobs.len() as u32,
stats.blob_count,
"after retry, backend manifest count = tree blob count",
);
}
#[test]
fn pending_delete_sync_failure_keeps_state_for_retry() {
let (inner, fp, tree) = setup_with_pending_delete();
assert!(
tree.stats().unwrap().bm_pending_delete_count > 0,
"setup precondition: pending delete queued",
);
fp.arm_flush(2);
let result1 = tree.checkpoint();
assert!(
result1.is_err(),
"checkpoint must surface the post-delete Sync failure",
);
assert!(
tree.stats().unwrap().bm_pending_delete_count > 0,
"post-delete Sync failure must restore applied entries to pending",
);
tree.checkpoint().unwrap();
assert_eq!(tree.stats().unwrap().bm_pending_delete_count, 0);
let backend_blobs = inner.list_blobs().unwrap();
let stats = tree.stats().unwrap();
assert_eq!(
backend_blobs.len() as u32,
stats.blob_count,
"after retry, backend manifest count = tree blob count",
);
}
#[test]
fn dirty_write_failure_is_retried_next_round() {
let inner: Arc<dyn Backend> = Arc::new(MemoryBackend::new());
let fp = Arc::new(FailpointBackend::new(Arc::clone(&inner)));
let fp_clone: Arc<dyn Backend> = fp.clone();
let mut cfg = TreeConfig::memory();
cfg.memory_flush_on_write = false;
let tree = Tree::open_with_backend(cfg, fp_clone).unwrap();
tree.put(b"k1", b"v1").unwrap();
let writes_pre = fp.write_calls.load(Ordering::SeqCst);
fp.arm_write(writes_pre + 1);
let r1 = tree.checkpoint();
assert!(r1.is_err(), "first checkpoint should surface failpoint write error");
assert!(
tree.stats().unwrap().bm_dirty_count >= 1,
"failed write must leave dirty entry for retry",
);
tree.checkpoint().unwrap();
assert_eq!(tree.stats().unwrap().bm_dirty_count, 0);
assert_eq!(
tree.get(b"k1").unwrap().as_deref(),
Some(&b"v1"[..]),
);
}
#[test]
fn dirty_write_failure_does_not_propagate_to_pending_delete() {
let (inner, fp, tree) = setup_with_pending_delete();
let stats_before = tree.stats().unwrap();
assert!(
stats_before.bm_dirty_count > 0,
"setup precondition: dirty entry queued (got {})",
stats_before.bm_dirty_count,
);
assert!(
stats_before.bm_pending_delete_count > 0,
"setup precondition: pending delete queued (got {})",
stats_before.bm_pending_delete_count,
);
let pending_before = stats_before.bm_pending_delete_count;
let writes_pre = fp.write_calls.load(Ordering::SeqCst);
fp.arm_write(writes_pre + 1);
let result = tree.checkpoint();
assert!(
result.is_err(),
"checkpoint must surface the write_through failure",
);
let stats_after = tree.stats().unwrap();
assert!(
stats_after.bm_dirty_count >= 1,
"failed dirty write must stay in `dirty` for next round (got {})",
stats_after.bm_dirty_count,
);
assert_eq!(
stats_after.bm_pending_delete_count, pending_before,
"dirty failure must NOT apply any pending delete — whole \
snapshot restored (got {} vs {})",
stats_after.bm_pending_delete_count, pending_before,
);
let backend_blobs = inner.list_blobs().unwrap();
let stats = tree.stats().unwrap();
assert_eq!(
backend_blobs.len() as u32,
stats.blob_count,
"no manifest delete must have applied while dirty write failed",
);
tree.checkpoint().unwrap();
let stats_done = tree.stats().unwrap();
assert_eq!(stats_done.bm_dirty_count, 0);
assert_eq!(stats_done.bm_pending_delete_count, 0);
}
#[test]
fn pre_delete_sync_failure_restores_pending() {
let (inner, fp, tree) = setup_with_pending_delete();
let pending_before = tree.stats().unwrap().bm_pending_delete_count;
assert!(pending_before > 0, "setup precondition");
let flushes_pre = fp.flush_calls.load(Ordering::SeqCst);
fp.arm_flush(flushes_pre + 1);
let result = tree.checkpoint();
assert!(
result.is_err(),
"checkpoint must surface the pre-delete Sync failure",
);
let stats_after = tree.stats().unwrap();
assert_eq!(
stats_after.bm_pending_delete_count, pending_before,
"pre-delete Sync failure must restore the entire pending \
snapshot (got {} vs {})",
stats_after.bm_pending_delete_count, pending_before,
);
let backend_blobs = inner.list_blobs().unwrap();
let stats = tree.stats().unwrap();
assert_eq!(
backend_blobs.len() as u32,
stats.blob_count,
"no manifest delete must have applied while pre-delete Sync failed",
);
tree.checkpoint().unwrap();
assert_eq!(tree.stats().unwrap().bm_pending_delete_count, 0);
}
#[test]
fn bg_checkpointer_recovers_from_transient_failure() {
let inner: Arc<dyn Backend> = Arc::new(MemoryBackend::new());
let fp = Arc::new(FailpointBackend::new(Arc::clone(&inner)));
let fp_clone: Arc<dyn Backend> = fp.clone();
let dir = tempdir().unwrap();
let _ = dir;
let mut cfg = TreeConfig::memory();
cfg.memory_flush_on_write = false;
cfg.checkpoint = CheckpointConfig {
enabled: true,
idle_interval: Duration::from_millis(10),
dirty_blob_threshold: 1,
auto_merge: false,
..CheckpointConfig::default()
};
let tree = Tree::open_with_backend(cfg, fp_clone).unwrap();
tree.put(b"k1", b"v1").unwrap();
let writes_pre = fp.write_calls.load(Ordering::SeqCst);
fp.arm_write(writes_pre + 1);
let deadline = Instant::now() + Duration::from_secs(3);
loop {
let dirty = tree.stats().unwrap().bm_dirty_count;
if dirty == 0 {
break;
}
assert!(
Instant::now() < deadline,
"bg checkpointer didn't recover from failpoint (dirty_count = {dirty})",
);
std::thread::sleep(Duration::from_millis(20));
}
}