#![cfg(feature = "persistent-artrie")]
use loom::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use loom::sync::{Arc, Mutex, RwLock};
use loom::thread;
const MAX_LSN: usize = 2;
struct Model {
#[allow(dead_code)]
next_lsn: AtomicUsize,
appended: [AtomicBool; MAX_LSN + 1],
committed: [AtomicBool; MAX_LSN + 1],
root: RwLock<Arc<RootSnapshot>>,
root_cas_lock: Mutex<()>,
durable_ckpt: RwLock<[bool; MAX_LSN + 1]>,
checkpoint_lsn: AtomicUsize,
wal_retained_from: AtomicUsize,
}
#[derive(Clone)]
struct RootSnapshot {
visible: [bool; MAX_LSN + 1],
}
impl Model {
fn new() -> Self {
Model {
next_lsn: AtomicUsize::new(1),
appended: Default::default(),
committed: Default::default(),
root: RwLock::new(Arc::new(RootSnapshot {
visible: [false; MAX_LSN + 1],
})),
root_cas_lock: Mutex::new(()),
durable_ckpt: RwLock::new([false; MAX_LSN + 1]),
checkpoint_lsn: AtomicUsize::new(0),
wal_retained_from: AtomicUsize::new(1),
}
}
fn watermark(&self) -> usize {
let mut w = 0;
for l in 1..=MAX_LSN {
if self.committed[l].load(Ordering::Acquire) {
w = l;
} else {
break; }
}
w
}
fn appended_frontier(&self) -> usize {
let mut f = 0;
for l in 1..=MAX_LSN {
if self.appended[l].load(Ordering::Acquire) {
f = l;
}
}
f
}
fn order_a_write(&self, lsn: usize) {
self.appended[lsn].store(true, Ordering::Release);
{
let _cas = self.root_cas_lock.lock().expect("root cas lock");
let current = self.root.read().expect("root read").clone();
let mut next = (*current).clone();
next.visible[lsn] = true;
*self.root.write().expect("root write") = Arc::new(next);
self.committed[lsn].store(true, Ordering::Release);
}
}
fn checkpoint(&self, use_watermark: bool) {
let target = if use_watermark {
self.watermark()
} else {
self.appended_frontier()
};
let snap = self.root.read().expect("root read").clone();
let mut image = [false; MAX_LSN + 1];
for l in 1..=MAX_LSN {
if snap.visible[l] && l <= target {
image[l] = true;
}
}
*self.durable_ckpt.write().expect("ckpt write") = image;
loop {
let cur = self.checkpoint_lsn.load(Ordering::Acquire);
let next = cur.max(target);
if cur == next
|| self
.checkpoint_lsn
.compare_exchange(cur, next, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
break;
}
}
let ckpt_lsn = self.checkpoint_lsn.load(Ordering::Acquire);
loop {
let cur = self.wal_retained_from.load(Ordering::Acquire);
let next = cur.max(ckpt_lsn + 1);
if cur == next
|| self
.wal_retained_from
.compare_exchange(cur, next, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
break;
}
}
}
fn recovered_set(&self) -> [bool; MAX_LSN + 1] {
let durable_ckpt = *self.durable_ckpt.read().expect("ckpt read");
let ckpt_lsn = self.checkpoint_lsn.load(Ordering::Acquire);
let retained_from = self.wal_retained_from.load(Ordering::Acquire);
let mut recovered = durable_ckpt;
for l in 1..=MAX_LSN {
if self.appended[l].load(Ordering::Acquire) && l > ckpt_lsn && l >= retained_from {
recovered[l] = true;
}
}
recovered
}
fn committed_set(&self) -> [bool; MAX_LSN + 1] {
let mut c = [false; MAX_LSN + 1];
for l in 1..=MAX_LSN {
c[l] = self.committed[l].load(Ordering::Acquire);
}
c
}
fn first_lost_committed_write(&self) -> Option<usize> {
let committed = self.committed_set();
let recovered = self.recovered_set();
(1..=MAX_LSN).find(|&l| committed[l] && !recovered[l])
}
}
fn run_writers_vs_checkpointer(use_watermark: bool) -> Option<usize> {
let model = Arc::new(Model::new());
let w1 = {
let model = Arc::clone(&model);
thread::spawn(move || model.order_a_write(1))
};
let w2 = {
let model = Arc::clone(&model);
thread::spawn(move || model.order_a_write(2))
};
let ckpt = {
let model = Arc::clone(&model);
thread::spawn(move || model.checkpoint(use_watermark))
};
w1.join().expect("writer 1");
w2.join().expect("writer 2");
ckpt.join().expect("checkpointer");
model.first_lost_committed_write()
}
#[test]
fn watermark_single_writer_concurrent_checkpoint_loses_nothing() {
loom::model(|| {
let model = Arc::new(Model::new());
let writer = {
let model = Arc::clone(&model);
thread::spawn(move || model.order_a_write(1))
};
let ckpt = {
let model = Arc::clone(&model);
thread::spawn(move || model.checkpoint(true))
};
writer.join().expect("writer");
ckpt.join().expect("checkpointer");
assert!(
model.first_lost_committed_write().is_none(),
"committed watermark must lose no visible write (lost LSN: {:?})",
model.first_lost_committed_write()
);
for l in 1..=MAX_LSN {
if model.committed[l].load(Ordering::Acquire) {
assert!(
model.appended[l].load(Ordering::Acquire),
"Order A violated: LSN {l} is visible but not durable"
);
}
}
});
}
#[test]
fn watermark_two_writers_out_of_order_commit_concurrent_checkpoint_loses_nothing() {
loom::model(|| {
let lost = run_writers_vs_checkpointer( true);
assert!(
lost.is_none(),
"committed-watermark checkpoint_lsn must never lose a visible write under \
out-of-order lock-free commit; lost LSN {lost:?} (NoLostWriteUnderLockFreeCommit \
violated — this MUST NOT happen with the watermark)"
);
});
}
#[test]
#[should_panic(expected = "NEGATIVE CONTROL fired")]
fn appended_frontier_negative_control_loses_a_write() {
loom::model(|| {
let lost = run_writers_vs_checkpointer( false);
assert!(
lost.is_none(),
"NEGATIVE CONTROL fired: appended-frontier checkpoint_lsn lost visible LSN \
{lost:?} (a write appended-before-but-committed-after capture was archived \
out of recovery's reach — exactly the GAP_LEDGER #41 footgun the committed \
watermark closes; mirrors LockFreeDurableCheckpoint _Unsafe.cfg)"
);
});
}