#![cfg(noxu_shuttle)]
use std::sync::atomic::Ordering;
use noxu_log::fsync_manager::FsyncManager;
use noxu_util::dst_invariants::{
assert_durable_covers_commit, assert_fsynced_never_decreases,
};
use shuttle::sync::Arc;
use shuttle::sync::atomic::{AtomicU64, AtomicUsize};
const ITERATIONS: usize = 5_000;
#[test]
fn shuttle_catches_the_lost_wakeup() {
let caught = std::panic::catch_unwind(|| {
shuttle::check_random(
|| {
let mgr = Arc::new(FsyncManager::new(0, 0));
let handles: Vec<_> = (0..2)
.map(|_| {
let mgr = Arc::clone(&mgr);
shuttle::thread::spawn(move || {
let _ = mgr.flush_and_sync(|| Ok(0));
})
})
.collect();
for h in handles {
h.join().unwrap();
}
},
ITERATIONS,
);
});
assert!(
caught.is_err(),
"shuttle should have caught the timeout-masked group-commit orphan; \
if the hand-off is now timeout-independent, un-ignore \
fsync_coalescing_and_coverage_hold and delete this tripwire"
);
}
#[test]
#[ignore = "group-commit liveness depends on fsync_timeout; shuttle cannot \
model timeouts (see module docs). Enable once the hand-off is \
timeout-independent."]
fn fsync_coalescing_and_coverage_hold() {
shuttle::check_random(
|| {
const N: usize = 3;
let mgr = Arc::new(FsyncManager::new(0, 0));
let next_lsn = Arc::new(AtomicU64::new(1));
let snap_lsn = Arc::new(AtomicU64::new(0));
let flushed_lsn = Arc::new(AtomicU64::new(0));
let fsync_execs = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..N)
.map(|_| {
let mgr = Arc::clone(&mgr);
let next_lsn = Arc::clone(&next_lsn);
let snap_lsn = Arc::clone(&snap_lsn);
let flushed_lsn = Arc::clone(&flushed_lsn);
let fsync_execs = Arc::clone(&fsync_execs);
shuttle::thread::spawn(move || {
let my_lsn = next_lsn.fetch_add(1, Ordering::SeqCst);
bump_max(&snap_lsn, my_lsn);
let flushed_lsn2 = Arc::clone(&flushed_lsn);
let snap_lsn2 = Arc::clone(&snap_lsn);
let execs2 = Arc::clone(&fsync_execs);
let durable = mgr
.flush_and_sync(move || {
execs2.fetch_add(1, Ordering::SeqCst);
let covered = snap_lsn2.load(Ordering::SeqCst);
let old = flushed_lsn2.load(Ordering::SeqCst);
let newv = covered.max(old);
flushed_lsn2.store(newv, Ordering::SeqCst);
assert_fsynced_never_decreases(old, newv);
Ok(covered)
})
.expect("no fault injected: fsync must succeed");
assert_durable_covers_commit(durable.as_u64(), my_lsn);
let global = flushed_lsn.load(Ordering::SeqCst);
assert_durable_covers_commit(global, my_lsn);
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let execs = fsync_execs.load(Ordering::SeqCst);
assert!(
execs >= 1 && execs <= N,
"fsync executions {execs} out of range 1..={N}: a redundant \
(double) fsync or a missing one indicates a coalescing bug"
);
let final_flushed = flushed_lsn.load(Ordering::SeqCst);
let highest = next_lsn.load(Ordering::SeqCst) - 1;
assert!(
final_flushed >= highest,
"final durable watermark {final_flushed} < highest commit \
LSN {highest}: a committed write was left unsynced"
);
},
ITERATIONS,
);
}
fn bump_max(cell: &AtomicU64, v: u64) {
let mut cur = cell.load(Ordering::SeqCst);
while cur < v {
match cell.compare_exchange(cur, v, Ordering::SeqCst, Ordering::SeqCst)
{
Ok(_) => break,
Err(a) => cur = a,
}
}
}