use std::collections::BTreeSet;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
#[derive(Debug)]
pub struct CommittedWatermark {
contiguous: AtomicU64,
pending: Mutex<BTreeSet<u64>>,
image_coverage_lsn: AtomicU64,
}
impl CommittedWatermark {
pub fn new(base: u64) -> Self {
Self {
contiguous: AtomicU64::new(base),
pending: Mutex::new(BTreeSet::new()),
image_coverage_lsn: AtomicU64::new(0),
}
}
pub fn set_recovery_image_coverage(&self, lsn: u64) {
self.image_coverage_lsn.fetch_max(lsn, Ordering::AcqRel);
}
pub fn take_recovery_image_coverage(&self) -> u64 {
self.image_coverage_lsn.swap(0, Ordering::AcqRel)
}
#[inline]
pub fn watermark(&self) -> u64 {
self.contiguous.load(Ordering::Acquire)
}
pub fn mark_committed(&self, lsn: u64) {
let mut pending = self
.pending
.lock()
.expect("CommittedWatermark pending lock poisoned");
let mut cur = self.contiguous.load(Ordering::Acquire);
if lsn <= cur {
return; }
pending.insert(lsn);
while pending.remove(&(cur + 1)) {
cur += 1;
}
self.contiguous.store(cur, Ordering::Release);
}
}
#[cfg(test)]
mod tests {
use super::CommittedWatermark;
use std::sync::Arc;
use std::thread;
#[test]
fn empty_base_starts_at_zero() {
let w = CommittedWatermark::new(0);
assert_eq!(w.watermark(), 0);
}
#[test]
fn out_of_order_does_not_advance_until_prefix_closes() {
let w = CommittedWatermark::new(0);
w.mark_committed(2);
assert_eq!(
w.watermark(),
0,
"gap at LSN 1 must hold the watermark at 0"
);
w.mark_committed(1);
assert_eq!(w.watermark(), 2, "closing LSN 1 drains the 1,2 run");
w.mark_committed(3);
assert_eq!(w.watermark(), 3);
}
#[test]
fn scrambled_commit_order_closes_in_one_drain() {
let w = CommittedWatermark::new(0);
for lsn in [3u64, 5, 4, 2] {
w.mark_committed(lsn);
assert_eq!(w.watermark(), 0, "still 0 until LSN 1 commits (have {lsn})");
}
w.mark_committed(1);
assert_eq!(w.watermark(), 5);
}
#[test]
fn recovered_base_treated_as_committed_prefix() {
let w = CommittedWatermark::new(5);
assert_eq!(w.watermark(), 5);
w.mark_committed(7);
assert_eq!(w.watermark(), 5, "gap at 6 holds the watermark at the base");
w.mark_committed(6);
assert_eq!(w.watermark(), 7);
w.mark_committed(3);
assert_eq!(w.watermark(), 7);
}
#[test]
fn marking_is_idempotent() {
let w = CommittedWatermark::new(0);
w.mark_committed(1);
w.mark_committed(1);
w.mark_committed(1);
assert_eq!(w.watermark(), 1);
}
#[test]
fn concurrent_committers_converge_to_full_prefix() {
let w = Arc::new(CommittedWatermark::new(0));
let n_threads = 8;
let max = 200u64;
let handles: Vec<_> = (0..n_threads)
.map(|t| {
let w = Arc::clone(&w);
thread::spawn(move || {
let mut lsn = max - (t as u64);
while lsn >= 1 {
w.mark_committed(lsn);
if lsn < n_threads as u64 {
break;
}
lsn -= n_threads as u64;
}
})
})
.collect();
for h in handles {
h.join().expect("committer thread");
}
assert_eq!(
w.watermark(),
max,
"once every LSN 1..=200 is committed the watermark must reach 200"
);
}
}