#![allow(clippy::panic)]
use batpak::prelude::*;
use batpak::store::cursor::{CursorWorkerAction, CursorWorkerConfig};
use batpak::store::{RestartPolicy, Store, StoreConfig};
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tempfile::TempDir;
fn test_config(dir: &TempDir) -> StoreConfig {
StoreConfig::new(dir.path())
.with_enable_checkpoint(false)
.with_enable_mmap_index(false)
.with_restart_policy(RestartPolicy::Bounded {
max_restarts: 2,
within_ms: 5_000,
})
.with_sync_every_n_events(1)
}
#[test]
fn cursor_worker_restarts_from_last_committed_checkpoint_after_panic() {
let dir = TempDir::new().expect("temp dir");
let store = Arc::new(Store::open(test_config(&dir)).expect("open store"));
let coord = Coordinate::new("entity:cursor-worker", "scope:restart").expect("coord");
let kind = EventKind::custom(0xF, 7);
for n in 0..3u32 {
store
.append(&coord, kind, &serde_json::json!({"n": n}))
.expect("append seed event");
}
let seen = Arc::new(Mutex::new(BTreeMap::<u64, usize>::new()));
let panic_once = Arc::new(AtomicBool::new(true));
let worker = store
.cursor_worker(
&Region::entity("entity:cursor-worker"),
CursorWorkerConfig {
batch_size: 1,
idle_sleep: Duration::from_millis(1),
restart: RestartPolicy::Bounded {
max_restarts: 2,
within_ms: 5_000,
},
},
{
let seen = Arc::clone(&seen);
let panic_once = Arc::clone(&panic_once);
move |batch, _store| {
let seq = batch[0].global_sequence;
let mut counts = seen.lock().expect("counts mutex");
*counts.entry(seq).or_insert(0) += 1;
drop(counts);
if seq == 1 && panic_once.swap(false, Ordering::SeqCst) {
panic!("intentional cursor worker panic after first checkpoint");
}
if seq == 2 {
CursorWorkerAction::Stop
} else {
CursorWorkerAction::Continue
}
}
},
)
.expect("spawn worker");
let deadline = Instant::now() + Duration::from_secs(5);
loop {
let snapshot = seen.lock().expect("counts mutex").clone();
if snapshot.get(&0) == Some(&1)
&& snapshot.get(&1) == Some(&2)
&& snapshot.get(&2) == Some(&1)
{
break;
}
assert!(
Instant::now() < deadline,
"PROPERTY: cursor worker must restart from the last committed checkpoint after panic.\n\
Expected sequence counts {{0:1, 1:2, 2:1}}, got {snapshot:?}."
);
std::thread::sleep(Duration::from_millis(10));
}
worker.join().expect("join worker");
let snapshot = seen.lock().expect("counts mutex").clone();
assert_eq!(
snapshot.get(&0),
Some(&1),
"first committed batch should not be replayed after restart"
);
assert_eq!(
snapshot.get(&1),
Some(&2),
"failed batch must be retried exactly once after restart"
);
assert_eq!(
snapshot.get(&2),
Some(&1),
"subsequent batch should run once after restart recovery"
);
let store = match Arc::try_unwrap(store) {
Ok(store) => store,
Err(_) => panic!("PROPERTY: cursor worker should release the last Arc"),
};
store.close().expect("close store");
}
#[test]
fn cursor_worker_exits_cleanly_when_restart_budget_exhausted() {
let dir = TempDir::new().expect("temp dir");
let store = Arc::new(
Store::open(
StoreConfig::new(dir.path())
.with_enable_checkpoint(false)
.with_enable_mmap_index(false)
.with_restart_policy(RestartPolicy::Bounded {
max_restarts: 1,
within_ms: 5_000,
})
.with_sync_every_n_events(1),
)
.expect("open store"),
);
let coord = Coordinate::new("entity:budget-exhausted", "scope:test").expect("coord");
let kind = EventKind::custom(0xF, 7);
for n in 0..3u32 {
store
.append(&coord, kind, &serde_json::json!({"n": n}))
.expect("append seed event");
}
let worker = store
.cursor_worker(
&Region::entity("entity:budget-exhausted"),
CursorWorkerConfig {
batch_size: 1,
idle_sleep: Duration::from_millis(1),
restart: RestartPolicy::Bounded {
max_restarts: 1,
within_ms: 5_000,
},
},
|_batch, _store| {
panic!("intentional panic to exhaust restart budget");
},
)
.expect("spawn worker");
worker.join().expect("join worker after budget exhaustion");
let receipt = store
.append(&coord, kind, &serde_json::json!({"after": true}))
.expect("append after worker exit");
assert!(
receipt.sequence >= 3,
"PROPERTY: store must remain usable after cursor worker exhausts its restart budget. \
Expected sequence >= 3, got {}.",
receipt.sequence
);
let store = match Arc::try_unwrap(store) {
Ok(store) => store,
Err(_) => {
panic!("PROPERTY: cursor worker should release the last Arc after budget exhaustion")
}
};
store.close().expect("close store");
}