use std::io;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use holt::{AlignedBlobBuf, BlobStore, CheckpointConfig, MemoryBlobStore, Tree, TreeConfig};
struct FailpointBlobStore {
inner: Arc<dyn BlobStore>,
delete_calls: AtomicUsize,
flush_calls: AtomicUsize,
write_calls: AtomicUsize,
fail_delete_at: AtomicUsize, fail_flush_at: AtomicUsize,
fail_write_at: AtomicUsize,
flush_retry_pending: AtomicBool,
}
impl FailpointBlobStore {
fn new(inner: Arc<dyn BlobStore>) -> Self {
Self {
inner,
delete_calls: AtomicUsize::new(0),
flush_calls: AtomicUsize::new(0),
write_calls: AtomicUsize::new(0),
fail_delete_at: AtomicUsize::new(usize::MAX),
fail_flush_at: AtomicUsize::new(usize::MAX),
fail_write_at: AtomicUsize::new(usize::MAX),
flush_retry_pending: AtomicBool::new(false),
}
}
fn arm_delete(&self, nth: usize) {
self.fail_delete_at.store(nth, Ordering::SeqCst);
}
fn arm_flush(&self, nth: usize) {
self.fail_flush_at.store(nth, Ordering::SeqCst);
}
fn arm_write(&self, nth: usize) {
self.fail_write_at.store(nth, Ordering::SeqCst);
}
fn delete_count(&self) -> usize {
self.delete_calls.load(Ordering::SeqCst)
}
fn flush_count(&self) -> usize {
self.flush_calls.load(Ordering::SeqCst)
}
}
fn failpoint_err(msg: &'static str) -> holt::Error {
holt::Error::BlobStoreIo(io::Error::other(msg))
}
impl BlobStore for FailpointBlobStore {
fn read_blob(&self, guid: holt::BlobGuid, dst: &mut AlignedBlobBuf) -> holt::Result<()> {
self.inner.read_blob(guid, dst)
}
fn write_blob(&self, guid: holt::BlobGuid, src: &AlignedBlobBuf) -> holt::Result<()> {
let n = self.write_calls.fetch_add(1, Ordering::SeqCst) + 1;
let armed = self.fail_write_at.load(Ordering::SeqCst);
if n == armed {
self.fail_write_at.store(usize::MAX, Ordering::SeqCst);
return Err(failpoint_err("failpoint: write_blob"));
}
self.inner.write_blob(guid, src)
}
fn delete_blob(&self, guid: holt::BlobGuid) -> holt::Result<()> {
let n = self.delete_calls.fetch_add(1, Ordering::SeqCst) + 1;
let armed = self.fail_delete_at.load(Ordering::SeqCst);
if n == armed {
self.fail_delete_at.store(usize::MAX, Ordering::SeqCst);
return Err(failpoint_err("failpoint: delete_blob"));
}
self.inner.delete_blob(guid)
}
fn list_blobs(&self) -> holt::Result<Vec<holt::BlobGuid>> {
self.inner.list_blobs()
}
fn flush(&self) -> holt::Result<()> {
let n = self.flush_calls.fetch_add(1, Ordering::SeqCst) + 1;
let armed = self.fail_flush_at.load(Ordering::SeqCst);
if n == armed {
self.fail_flush_at.store(usize::MAX, Ordering::SeqCst);
self.flush_retry_pending.store(true, Ordering::SeqCst);
return Err(failpoint_err("failpoint: flush"));
}
let result = self.inner.flush();
if result.is_ok() {
self.flush_retry_pending.store(false, Ordering::SeqCst);
}
result
}
fn needs_flush(&self) -> bool {
self.flush_retry_pending.load(Ordering::SeqCst) || self.inner.needs_flush()
}
fn has_blob(&self, guid: holt::BlobGuid) -> holt::Result<bool> {
self.inner.has_blob(guid)
}
}
#[test]
fn clean_checkpoint_skips_flush_inner() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let fp = Arc::new(FailpointBlobStore::new(Arc::clone(&inner)));
let fp_dyn: Arc<dyn BlobStore> = fp.clone();
let mut cfg = TreeConfig::memory();
cfg.memory_flush_on_write = false;
let tree = Tree::open_with_blob_store(cfg, fp_dyn).unwrap();
let stats = tree.stats().unwrap();
assert_eq!(stats.bm_dirty_count, 0);
assert_eq!(stats.bm_pending_delete_count, 0);
let flushes_before = fp.flush_count();
fp.arm_flush(flushes_before + 1);
tree.checkpoint().unwrap();
assert_eq!(
fp.flush_count(),
flushes_before,
"clean checkpoint must not issue a store flush",
);
}
fn setup_with_pending_delete() -> (Arc<dyn BlobStore>, Arc<FailpointBlobStore>, Tree) {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let fp = Arc::new(FailpointBlobStore::new(Arc::clone(&inner)));
let fp_dyn: Arc<dyn BlobStore> = fp.clone();
let mut cfg = TreeConfig::memory();
cfg.memory_flush_on_write = false;
let tree = Tree::open_with_blob_store(cfg, fp_dyn).unwrap();
let payload = vec![b'z'; 1024];
for i in 0..1000u32 {
let k = format!("k{i:05}");
tree.put(k.as_bytes(), &payload).unwrap();
}
for i in 0..950u32 {
let k = format!("k{i:05}");
let _ = tree.delete(k.as_bytes()).unwrap();
}
tree.compact().unwrap();
(inner, fp, tree)
}
#[test]
fn pending_delete_execute_failure_is_retried_next_round() {
let (inner, fp, tree) = setup_with_pending_delete();
let stats_before = tree.stats().unwrap();
assert!(
stats_before.bm_pending_delete_count > 0,
"setup must produce at least one pending delete (got {})",
stats_before.bm_pending_delete_count,
);
fp.arm_delete(fp.delete_count() + 1);
let result1 = tree.checkpoint();
assert!(
result1.is_err(),
"checkpoint must surface the first delete_blob failure",
);
assert!(
tree.stats().unwrap().bm_pending_delete_count > 0,
"failed delete entry must stay queued for retry",
);
tree.checkpoint().unwrap();
assert_eq!(tree.stats().unwrap().bm_pending_delete_count, 0);
let store_blobs = inner.list_blobs().unwrap();
let stats = tree.stats().unwrap();
assert_eq!(
store_blobs.len() as u32,
stats.blob_count,
"after retry, store manifest count = tree blob count",
);
}
#[test]
fn pending_delete_sync_failure_keeps_state_for_retry() {
let (inner, fp, tree) = setup_with_pending_delete();
assert!(
tree.stats().unwrap().bm_pending_delete_count > 0,
"setup precondition: pending delete queued",
);
fp.arm_flush(2);
let result1 = tree.checkpoint();
assert!(
result1.is_err(),
"checkpoint must surface the post-delete Sync failure",
);
assert!(
tree.stats().unwrap().bm_pending_delete_count > 0,
"post-delete Sync failure must restore applied entries to pending",
);
tree.checkpoint().unwrap();
assert_eq!(tree.stats().unwrap().bm_pending_delete_count, 0);
let store_blobs = inner.list_blobs().unwrap();
let stats = tree.stats().unwrap();
assert_eq!(
store_blobs.len() as u32,
stats.blob_count,
"after retry, store manifest count = tree blob count",
);
}
#[test]
fn dirty_write_failure_is_retried_next_round() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let fp = Arc::new(FailpointBlobStore::new(Arc::clone(&inner)));
let fp_clone: Arc<dyn BlobStore> = fp.clone();
let mut cfg = TreeConfig::memory();
cfg.memory_flush_on_write = false;
let tree = Tree::open_with_blob_store(cfg, fp_clone).unwrap();
tree.put(b"k1", b"v1").unwrap();
let writes_pre = fp.write_calls.load(Ordering::SeqCst);
fp.arm_write(writes_pre + 1);
let r1 = tree.checkpoint();
assert!(
r1.is_err(),
"first checkpoint should surface failpoint write error"
);
assert!(
tree.stats().unwrap().bm_dirty_count >= 1,
"failed write must leave dirty entry for retry",
);
tree.checkpoint().unwrap();
assert_eq!(tree.stats().unwrap().bm_dirty_count, 0);
assert_eq!(tree.get(b"k1").unwrap().as_deref(), Some(&b"v1"[..]),);
}
#[test]
fn dirty_write_failure_does_not_propagate_to_pending_delete() {
let (_inner, fp, tree) = setup_with_pending_delete();
let stats_before = tree.stats().unwrap();
assert!(
stats_before.bm_dirty_count > 0,
"setup precondition: dirty entry queued (got {})",
stats_before.bm_dirty_count,
);
assert!(
stats_before.bm_pending_delete_count > 0,
"setup precondition: pending delete queued (got {})",
stats_before.bm_pending_delete_count,
);
let pending_before = stats_before.bm_pending_delete_count;
let deletes_before = fp.delete_count();
let writes_pre = fp.write_calls.load(Ordering::SeqCst);
fp.arm_write(writes_pre + 1);
let result = tree.checkpoint();
assert!(
result.is_err(),
"checkpoint must surface the write_through failure",
);
let stats_after = tree.stats().unwrap();
assert!(
stats_after.bm_dirty_count >= 1,
"failed dirty write must stay in `dirty` for next round (got {})",
stats_after.bm_dirty_count,
);
assert_eq!(
stats_after.bm_pending_delete_count, pending_before,
"dirty failure must NOT apply any pending delete — whole \
snapshot restored (got {} vs {})",
stats_after.bm_pending_delete_count, pending_before,
);
assert_eq!(
fp.delete_count(),
deletes_before,
"no manifest delete attempt must run while dirty write failed",
);
tree.checkpoint().unwrap();
let stats_done = tree.stats().unwrap();
assert_eq!(stats_done.bm_dirty_count, 0);
assert_eq!(stats_done.bm_pending_delete_count, 0);
}
#[test]
fn pre_delete_sync_failure_restores_pending() {
let (inner, fp, tree) = setup_with_pending_delete();
let pending_before = tree.stats().unwrap().bm_pending_delete_count;
assert!(pending_before > 0, "setup precondition");
let flushes_pre = fp.flush_calls.load(Ordering::SeqCst);
fp.arm_flush(flushes_pre + 1);
let result = tree.checkpoint();
assert!(
result.is_err(),
"checkpoint must surface the pre-delete Sync failure",
);
let stats_after = tree.stats().unwrap();
assert_eq!(
stats_after.bm_pending_delete_count, pending_before,
"pre-delete Sync failure must restore the entire pending \
snapshot (got {} vs {})",
stats_after.bm_pending_delete_count, pending_before,
);
let store_blobs = inner.list_blobs().unwrap();
let stats = tree.stats().unwrap();
assert_eq!(
store_blobs.len() as u32,
stats.blob_count,
"no manifest delete must have applied while pre-delete Sync failed",
);
tree.checkpoint().unwrap();
assert_eq!(tree.stats().unwrap().bm_pending_delete_count, 0);
}
#[test]
fn bg_checkpointer_recovers_from_transient_failure() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let fp = Arc::new(FailpointBlobStore::new(Arc::clone(&inner)));
let fp_clone: Arc<dyn BlobStore> = fp.clone();
let mut cfg = TreeConfig::memory();
cfg.memory_flush_on_write = false;
cfg.checkpoint = CheckpointConfig {
enabled: true,
idle_interval: Duration::from_millis(10),
dirty_blob_threshold: 1,
auto_merge: false,
..CheckpointConfig::default()
};
let tree = Tree::open_with_blob_store(cfg, fp_clone).unwrap();
tree.put(b"k1", b"v1").unwrap();
let writes_pre = fp.write_calls.load(Ordering::SeqCst);
fp.arm_write(writes_pre + 1);
let deadline = Instant::now() + Duration::from_secs(3);
loop {
let dirty = tree.stats().unwrap().bm_dirty_count;
if dirty == 0 {
break;
}
assert!(
Instant::now() < deadline,
"bg checkpointer didn't recover from failpoint (dirty_count = {dirty})",
);
std::thread::sleep(Duration::from_millis(20));
}
}
#[test]
fn bg_checkpointer_retries_sync_after_dirty_retired() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let fp = Arc::new(FailpointBlobStore::new(Arc::clone(&inner)));
let fp_clone: Arc<dyn BlobStore> = fp.clone();
let mut cfg = TreeConfig::memory();
cfg.memory_flush_on_write = false;
cfg.checkpoint = CheckpointConfig {
enabled: true,
idle_interval: Duration::from_millis(10),
dirty_blob_threshold: 1,
auto_merge: false,
..CheckpointConfig::default()
};
let tree = Tree::open_with_blob_store(cfg, fp_clone).unwrap();
let flushes_pre = fp.flush_count();
fp.arm_flush(flushes_pre + 1);
tree.put(b"k1", b"v1").unwrap();
let deadline = Instant::now() + Duration::from_secs(3);
loop {
let dirty = tree.stats().unwrap().bm_dirty_count;
if dirty == 0 && !fp.needs_flush() && fp.flush_count() >= flushes_pre + 2 {
break;
}
assert!(
Instant::now() < deadline,
"bg checkpointer did not retry Sync after dirty retired \
(dirty={dirty}, needs_flush={}, flushes={})",
fp.needs_flush(),
fp.flush_count(),
);
std::thread::sleep(Duration::from_millis(20));
}
}