use std::collections::HashSet;
use crate::sync::{self, AtomicU64, Mutex, Ordering};
use crate::timestamp::Timestamp;
pub(crate) struct Oracle {
next_ts: AtomicU64,
read_ts: AtomicU64,
pending: Mutex<Pending>,
}
struct Pending {
done_upto: u64,
ahead: HashSet<u64>,
}
impl Oracle {
pub(crate) fn new() -> Self {
Oracle {
next_ts: AtomicU64::new(1),
read_ts: AtomicU64::new(Timestamp::ZERO.get()),
pending: Mutex::new(Pending {
done_upto: Timestamp::ZERO.get(),
ahead: HashSet::new(),
}),
}
}
#[cfg(feature = "durability")]
pub(crate) fn recovered(highest: Timestamp) -> Self {
let highest = highest.get();
Oracle {
next_ts: AtomicU64::new(highest + 1),
read_ts: AtomicU64::new(highest),
pending: Mutex::new(Pending {
done_upto: highest,
ahead: HashSet::new(),
}),
}
}
#[inline]
pub(crate) fn read_ts(&self) -> Timestamp {
Timestamp::from_raw(self.read_ts.load(Ordering::Acquire))
}
#[inline]
pub(crate) fn alloc_commit_ts(&self) -> Timestamp {
Timestamp::from_raw(self.next_ts.fetch_add(1, Ordering::Relaxed))
}
pub(crate) fn commit_done(&self, ts: Timestamp) {
let ts = ts.get();
let mut p = sync::lock(&self.pending);
if ts == p.done_upto + 1 {
p.done_upto = ts;
let mut next = ts + 1;
while p.ahead.remove(&next) {
p.done_upto = next;
next += 1;
}
self.read_ts.store(p.done_upto, Ordering::Release);
} else {
let _ = p.ahead.insert(ts);
}
}
}
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
#[test]
fn test_new_oracle_reads_at_zero() {
let o = Oracle::new();
assert_eq!(o.read_ts(), Timestamp::ZERO);
}
#[test]
fn test_commit_ts_is_strictly_increasing() {
let o = Oracle::new();
let a = o.alloc_commit_ts();
let b = o.alloc_commit_ts();
assert!(b > a);
assert_eq!(a, Timestamp::from_raw(1));
assert_eq!(b, Timestamp::from_raw(2));
}
#[test]
fn test_watermark_advances_on_in_order_completion() {
let o = Oracle::new();
let t1 = o.alloc_commit_ts();
o.commit_done(t1);
assert_eq!(o.read_ts(), Timestamp::from_raw(1));
}
#[test]
fn test_watermark_waits_for_earlier_timestamp() {
let o = Oracle::new();
let t1 = o.alloc_commit_ts();
let t2 = o.alloc_commit_ts();
o.commit_done(t2);
assert_eq!(o.read_ts(), Timestamp::ZERO);
o.commit_done(t1);
assert_eq!(o.read_ts(), Timestamp::from_raw(2));
}
}