#![cfg(noxu_shuttle)]
use std::sync::atomic::Ordering;
use std::time::Duration;
use noxu_log::fsync_manager::FsyncManager;
use noxu_util::SimClock;
use noxu_util::dst_invariants::{
assert_durable_covers_commit, assert_fsynced_never_decreases,
};
use noxu_util::dst_sync_pl::{advance_and_fire, install_sim_clock};
use shuttle::sync::Arc;
use shuttle::sync::atomic::{AtomicU64, AtomicUsize};
const ITERATIONS: usize = 5_000;
#[test]
fn fsync_coalescing_and_coverage_hold() {
shuttle::check_random(
|| {
const N: usize = 3;
let sim = Arc::new(SimClock::new(0));
install_sim_clock(Arc::clone(&sim));
let mgr = Arc::new(FsyncManager::with_clock(
0,
0,
Arc::clone(&sim) as Arc<dyn noxu_util::Clock>,
));
let next_lsn = Arc::new(AtomicU64::new(1));
let snap_lsn = Arc::new(AtomicU64::new(0));
let flushed_lsn = Arc::new(AtomicU64::new(0));
let fsync_execs = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..N)
.map(|_| {
let mgr = Arc::clone(&mgr);
let next_lsn = Arc::clone(&next_lsn);
let snap_lsn = Arc::clone(&snap_lsn);
let flushed_lsn = Arc::clone(&flushed_lsn);
let fsync_execs = Arc::clone(&fsync_execs);
shuttle::thread::spawn(move || {
let my_lsn = next_lsn.fetch_add(1, Ordering::SeqCst);
bump_max(&snap_lsn, my_lsn);
let flushed_lsn2 = Arc::clone(&flushed_lsn);
let snap_lsn2 = Arc::clone(&snap_lsn);
let execs2 = Arc::clone(&fsync_execs);
let durable = mgr
.flush_and_sync(move || {
execs2.fetch_add(1, Ordering::SeqCst);
let covered = snap_lsn2.load(Ordering::SeqCst);
let old = flushed_lsn2.load(Ordering::SeqCst);
let newv = covered.max(old);
flushed_lsn2.store(newv, Ordering::SeqCst);
assert_fsynced_never_decreases(old, newv);
Ok(covered)
})
.expect("no fault injected: fsync must succeed");
assert_durable_covers_commit(durable.as_u64(), my_lsn);
let global = flushed_lsn.load(Ordering::SeqCst);
assert_durable_covers_commit(global, my_lsn);
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let execs = fsync_execs.load(Ordering::SeqCst);
assert!(
(1..=N).contains(&execs),
"fsync executions {execs} out of range 1..={N}: a redundant \
(double) fsync or a missing one indicates a coalescing bug"
);
let final_flushed = flushed_lsn.load(Ordering::SeqCst);
let highest = next_lsn.load(Ordering::SeqCst) - 1;
assert!(
final_flushed >= highest,
"final durable watermark {final_flushed} < highest commit \
LSN {highest}: a committed write was left unsynced"
);
},
ITERATIONS,
);
}
#[test]
fn fsync_failure_fails_all_waiters() {
shuttle::check_random(
|| {
const N: usize = 3;
let sim = Arc::new(SimClock::new(0));
install_sim_clock(Arc::clone(&sim));
let mgr = Arc::new(FsyncManager::with_clock(
0,
0,
Arc::clone(&sim) as Arc<dyn noxu_util::Clock>,
));
let attempts = Arc::new(AtomicUsize::new(0));
let errors = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..N)
.map(|_| {
let mgr = Arc::clone(&mgr);
let attempts = Arc::clone(&attempts);
let errors = Arc::clone(&errors);
shuttle::thread::spawn(move || {
let attempts2 = Arc::clone(&attempts);
let r = mgr.flush_and_sync(move || {
attempts2.fetch_add(1, Ordering::SeqCst);
Err::<u64, _>(std::io::Error::other("fsync EIO"))
});
match r {
Ok(_) => panic!(
"a committer returned Ok despite a failed fsync"
),
Err(e) => {
assert!(
e.to_string().contains("fsync EIO"),
"error must carry the leader failure: {e}"
);
errors.fetch_add(1, Ordering::SeqCst);
}
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(
errors.load(Ordering::SeqCst),
N,
"every committer must observe the fsync failure"
);
let a = attempts.load(Ordering::SeqCst);
assert!(
(1..=N).contains(&a),
"fsync attempts {a} out of range 1..={N} under failure"
);
},
ITERATIONS,
);
}
#[test]
fn group_commit_wait_holds_under_sim_clock() {
shuttle::check_random(
|| {
const N: usize = 3;
let sim = Arc::new(SimClock::new(0));
install_sim_clock(Arc::clone(&sim));
let mgr = Arc::new(FsyncManager::with_clock(
2,
5,
Arc::clone(&sim) as Arc<dyn noxu_util::Clock>,
));
let next_lsn = Arc::new(AtomicU64::new(1));
let snap_lsn = Arc::new(AtomicU64::new(0));
let flushed_lsn = Arc::new(AtomicU64::new(0));
let done = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..N)
.map(|_| {
let mgr = Arc::clone(&mgr);
let next_lsn = Arc::clone(&next_lsn);
let snap_lsn = Arc::clone(&snap_lsn);
let flushed_lsn = Arc::clone(&flushed_lsn);
let done = Arc::clone(&done);
shuttle::thread::spawn(move || {
let my_lsn = next_lsn.fetch_add(1, Ordering::SeqCst);
bump_max(&snap_lsn, my_lsn);
let flushed_lsn2 = Arc::clone(&flushed_lsn);
let snap_lsn2 = Arc::clone(&snap_lsn);
let durable = mgr
.flush_and_sync(move || {
let covered = snap_lsn2.load(Ordering::SeqCst);
let old = flushed_lsn2.load(Ordering::SeqCst);
let newv = covered.max(old);
flushed_lsn2.store(newv, Ordering::SeqCst);
assert_fsynced_never_decreases(old, newv);
Ok(covered)
})
.expect("no fault injected: fsync must succeed");
assert_durable_covers_commit(durable.as_u64(), my_lsn);
done.fetch_add(1, Ordering::SeqCst);
})
})
.collect();
let mut steps = 0;
while done.load(Ordering::SeqCst) < N {
advance_and_fire(&sim, Duration::from_millis(10));
shuttle::thread::yield_now();
steps += 1;
assert!(
steps < 2000,
"group-commit wait never resolved (hang)"
);
}
for h in handles {
h.join().unwrap();
}
let highest = next_lsn.load(Ordering::SeqCst) - 1;
let final_flushed = flushed_lsn.load(Ordering::SeqCst);
assert!(
final_flushed >= highest,
"final durable watermark {final_flushed} < highest commit \
LSN {highest}: a committed write was left unsynced"
);
},
ITERATIONS,
);
}
fn bump_max(cell: &AtomicU64, v: u64) {
let mut cur = cell.load(Ordering::SeqCst);
while cur < v {
match cell.compare_exchange(cur, v, Ordering::SeqCst, Ordering::SeqCst)
{
Ok(_) => break,
Err(a) => cur = a,
}
}
}