use std::collections::{BTreeMap, HashSet};
use crate::sync::{self, AtomicU64, Mutex, Ordering};
use crate::timestamp::Timestamp;
pub(crate) struct Oracle {
next_ts: AtomicU64,
read_ts: AtomicU64,
pending: Mutex<Pending>,
readers: Mutex<BTreeMap<u64, usize>>,
}
struct Pending {
done_upto: u64,
ahead: HashSet<u64>,
}
impl Oracle {
pub(crate) fn new() -> Self {
Oracle {
next_ts: AtomicU64::new(1),
read_ts: AtomicU64::new(Timestamp::ZERO.get()),
pending: Mutex::new(Pending {
done_upto: Timestamp::ZERO.get(),
ahead: HashSet::new(),
}),
readers: Mutex::new(BTreeMap::new()),
}
}
#[cfg(feature = "durability")]
pub(crate) fn recovered(highest: Timestamp) -> Self {
let highest = highest.get();
Oracle {
next_ts: AtomicU64::new(highest + 1),
read_ts: AtomicU64::new(highest),
pending: Mutex::new(Pending {
done_upto: highest,
ahead: HashSet::new(),
}),
readers: Mutex::new(BTreeMap::new()),
}
}
#[inline]
pub(crate) fn read_ts(&self) -> Timestamp {
Timestamp::from_raw(self.read_ts.load(Ordering::Acquire))
}
#[inline]
pub(crate) fn alloc_commit_ts(&self) -> Timestamp {
Timestamp::from_raw(self.next_ts.fetch_add(1, Ordering::Relaxed))
}
pub(crate) fn begin_reader(&self) -> Timestamp {
let mut readers = sync::lock(&self.readers);
let ts = self.read_ts.load(Ordering::Acquire);
*readers.entry(ts).or_insert(0) += 1;
Timestamp::from_raw(ts)
}
pub(crate) fn end_reader(&self, ts: Timestamp) {
let key = ts.get();
let mut readers = sync::lock(&self.readers);
let now_zero = match readers.get_mut(&key) {
Some(count) => {
*count -= 1;
*count == 0
}
None => false,
};
if now_zero {
let _ = readers.remove(&key);
}
}
pub(crate) fn low_watermark(&self) -> Timestamp {
let readers = sync::lock(&self.readers);
match readers.keys().next() {
Some(&oldest) => Timestamp::from_raw(oldest),
None => Timestamp::from_raw(self.read_ts.load(Ordering::Acquire)),
}
}
pub(crate) fn commit_done(&self, ts: Timestamp) {
let ts = ts.get();
let mut p = sync::lock(&self.pending);
if ts == p.done_upto + 1 {
p.done_upto = ts;
let mut next = ts + 1;
while p.ahead.remove(&next) {
p.done_upto = next;
next += 1;
}
self.read_ts.store(p.done_upto, Ordering::Release);
} else {
let _ = p.ahead.insert(ts);
}
}
}
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
#[test]
fn test_new_oracle_reads_at_zero() {
let o = Oracle::new();
assert_eq!(o.read_ts(), Timestamp::ZERO);
}
#[test]
fn test_commit_ts_is_strictly_increasing() {
let o = Oracle::new();
let a = o.alloc_commit_ts();
let b = o.alloc_commit_ts();
assert!(b > a);
assert_eq!(a, Timestamp::from_raw(1));
assert_eq!(b, Timestamp::from_raw(2));
}
#[test]
fn test_watermark_advances_on_in_order_completion() {
let o = Oracle::new();
let t1 = o.alloc_commit_ts();
o.commit_done(t1);
assert_eq!(o.read_ts(), Timestamp::from_raw(1));
}
#[test]
fn test_watermark_waits_for_earlier_timestamp() {
let o = Oracle::new();
let t1 = o.alloc_commit_ts();
let t2 = o.alloc_commit_ts();
o.commit_done(t2);
assert_eq!(o.read_ts(), Timestamp::ZERO);
o.commit_done(t1);
assert_eq!(o.read_ts(), Timestamp::from_raw(2));
}
}