use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Default)]
pub struct MvccClock {
counter: AtomicU64,
}
impl MvccClock {
pub fn new(initial: u64) -> Self {
Self {
counter: AtomicU64::new(initial),
}
}
pub fn now(&self) -> u64 {
self.counter.load(Ordering::Acquire)
}
pub fn tick(&self) -> u64 {
self.counter.fetch_add(1, Ordering::AcqRel) + 1
}
pub fn observe(&self, value: u64) {
let mut current = self.counter.load(Ordering::Acquire);
while value > current {
match self.counter.compare_exchange_weak(
current,
value,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return,
Err(actual) => current = actual,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn new_seeds_the_counter() {
let c = MvccClock::new(42);
assert_eq!(c.now(), 42);
assert_eq!(c.tick(), 43);
assert_eq!(c.now(), 43);
}
#[test]
fn default_starts_at_zero() {
let c = MvccClock::default();
assert_eq!(c.now(), 0);
assert_eq!(c.tick(), 1);
}
#[test]
fn tick_is_strictly_monotonic_within_a_thread() {
let c = MvccClock::new(0);
let mut last = 0;
for _ in 0..1_000 {
let t = c.tick();
assert!(t > last, "tick went backwards: {t} after {last}");
last = t;
}
}
#[test]
fn observe_only_moves_forward() {
let c = MvccClock::new(100);
c.observe(50); assert_eq!(c.now(), 100);
c.observe(200);
assert_eq!(c.now(), 200);
c.observe(150); assert_eq!(c.now(), 200);
}
#[test]
fn ticks_are_unique_under_contention() {
const THREADS: usize = 8;
const PER_THREAD: usize = 250;
let clock = Arc::new(MvccClock::new(0));
let handles: Vec<_> = (0..THREADS)
.map(|_| {
let c = Arc::clone(&clock);
thread::spawn(move || {
let mut out = Vec::with_capacity(PER_THREAD);
for _ in 0..PER_THREAD {
out.push(c.tick());
}
out
})
})
.collect();
let mut all = Vec::with_capacity(THREADS * PER_THREAD);
for h in handles {
all.extend(h.join().unwrap());
}
all.sort_unstable();
for w in all.windows(2) {
assert_ne!(w[0], w[1], "duplicate timestamp {}", w[0]);
}
assert_eq!(all.first().copied(), Some(1));
assert_eq!(all.last().copied(), Some((THREADS * PER_THREAD) as u64));
}
#[test]
fn observe_under_contention_never_regresses() {
const THREADS: usize = 8;
let clock = Arc::new(MvccClock::new(0));
let handles: Vec<_> = (0..THREADS)
.map(|tid| {
let c = Arc::clone(&clock);
thread::spawn(move || {
c.observe((tid as u64 + 1) * 1_000);
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(clock.now(), THREADS as u64 * 1_000);
}
}