Skip to main content

sqlrite/mvcc/
clock.rs

1//! [`MvccClock`] — the logical-clock primitive that hands out
2//! begin- and commit-timestamps for MVCC transactions (Phase 11.2).
3//!
4//! Per [`docs/concurrent-writes-plan.md`](../../../docs/concurrent-writes-plan.md):
5//!
6//! > A monotonic `u64` counter, per-`Database`. Hands out `begin_ts`
7//! > at `BEGIN CONCURRENT` and `commit_ts` at the start of validation.
8//! > Wrapped in `AtomicU64`; no contention because each transaction
9//! > calls it twice.
10//!
11//! The clock is persisted to the WAL header on each checkpoint so
12//! reopens resume past the highest committed timestamp — see
13//! [`crate::sql::pager::wal::WalHeader::clock_high_water`]. Without
14//! persistence, two transactions on either side of a reopen could
15//! receive the same timestamp and the snapshot-isolation visibility
16//! rule (`begin <= ts < end`) would mis-classify one of them.
17
18use std::sync::atomic::{AtomicU64, Ordering};
19
20/// Process-wide logical clock. Cheap to clone — internally an `Arc`
21/// over an [`AtomicU64`] in the [`Database`](crate::Database) wiring
22/// (added in Phase 11.3). Standalone today.
23#[derive(Debug, Default)]
24pub struct MvccClock {
25    counter: AtomicU64,
26}
27
28impl MvccClock {
29    /// Builds a clock seeded at `initial`. The next [`MvccClock::tick`]
30    /// returns `initial + 1`.
31    ///
32    /// Use this with the value persisted in the WAL header at open
33    /// time so the clock resumes past the last-checkpointed
34    /// high-water mark.
35    pub fn new(initial: u64) -> Self {
36        Self {
37            counter: AtomicU64::new(initial),
38        }
39    }
40
41    /// Returns the current high-water timestamp without advancing it.
42    /// Phase 11.6's GC reads this alongside
43    /// [`super::ActiveTxRegistry::min_active_begin_ts`] to decide
44    /// which row-version chains are reclaimable.
45    pub fn now(&self) -> u64 {
46        self.counter.load(Ordering::Acquire)
47    }
48
49    /// Bumps the clock by one and returns the new value. Strictly
50    /// monotonic: every call observes a distinct `u64`.
51    pub fn tick(&self) -> u64 {
52        // `fetch_add` returns the *previous* value — adjust to "after"
53        // semantics so callers see "the timestamp this call hands out".
54        // Wrap-around is impossible in practice (a billion ticks/s for
55        // 600 years still fits in `u64`), so saturating-add isn't
56        // needed.
57        self.counter.fetch_add(1, Ordering::AcqRel) + 1
58    }
59
60    /// Promotes the clock to at least `value`. No-op if `value` is at
61    /// or below the current high-water mark. Used at WAL replay to
62    /// bring the in-memory clock up to the persisted high-water
63    /// without an extra `tick()`.
64    pub fn observe(&self, value: u64) {
65        let mut current = self.counter.load(Ordering::Acquire);
66        while value > current {
67            // CAS rather than `store` — racing observers shouldn't
68            // step on each other and shouldn't move the clock
69            // backwards if a faster `tick` already overtook them.
70            match self.counter.compare_exchange_weak(
71                current,
72                value,
73                Ordering::AcqRel,
74                Ordering::Acquire,
75            ) {
76                Ok(_) => return,
77                Err(actual) => current = actual,
78            }
79        }
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use super::*;
86    use std::sync::Arc;
87    use std::thread;
88
89    #[test]
90    fn new_seeds_the_counter() {
91        let c = MvccClock::new(42);
92        assert_eq!(c.now(), 42);
93        assert_eq!(c.tick(), 43);
94        assert_eq!(c.now(), 43);
95    }
96
97    #[test]
98    fn default_starts_at_zero() {
99        let c = MvccClock::default();
100        assert_eq!(c.now(), 0);
101        assert_eq!(c.tick(), 1);
102    }
103
104    #[test]
105    fn tick_is_strictly_monotonic_within_a_thread() {
106        let c = MvccClock::new(0);
107        let mut last = 0;
108        for _ in 0..1_000 {
109            let t = c.tick();
110            assert!(t > last, "tick went backwards: {t} after {last}");
111            last = t;
112        }
113    }
114
115    #[test]
116    fn observe_only_moves_forward() {
117        let c = MvccClock::new(100);
118        c.observe(50); // ignored — below current
119        assert_eq!(c.now(), 100);
120        c.observe(200);
121        assert_eq!(c.now(), 200);
122        c.observe(150); // ignored — below current
123        assert_eq!(c.now(), 200);
124    }
125
126    /// Concurrent ticks across N threads must hand out N × M distinct
127    /// values (no duplicates, no skipped values). This is the property
128    /// MVCC visibility relies on.
129    #[test]
130    fn ticks_are_unique_under_contention() {
131        const THREADS: usize = 8;
132        const PER_THREAD: usize = 250;
133        let clock = Arc::new(MvccClock::new(0));
134
135        let handles: Vec<_> = (0..THREADS)
136            .map(|_| {
137                let c = Arc::clone(&clock);
138                thread::spawn(move || {
139                    let mut out = Vec::with_capacity(PER_THREAD);
140                    for _ in 0..PER_THREAD {
141                        out.push(c.tick());
142                    }
143                    out
144                })
145            })
146            .collect();
147
148        let mut all = Vec::with_capacity(THREADS * PER_THREAD);
149        for h in handles {
150            all.extend(h.join().unwrap());
151        }
152        all.sort_unstable();
153        // No duplicates.
154        for w in all.windows(2) {
155            assert_ne!(w[0], w[1], "duplicate timestamp {}", w[0]);
156        }
157        // Range is contiguous 1..=THREADS*PER_THREAD (clock seeded at 0).
158        assert_eq!(all.first().copied(), Some(1));
159        assert_eq!(all.last().copied(), Some((THREADS * PER_THREAD) as u64));
160    }
161
162    /// Concurrent `observe`s must not move the clock backwards.
163    #[test]
164    fn observe_under_contention_never_regresses() {
165        const THREADS: usize = 8;
166        let clock = Arc::new(MvccClock::new(0));
167        let handles: Vec<_> = (0..THREADS)
168            .map(|tid| {
169                let c = Arc::clone(&clock);
170                thread::spawn(move || {
171                    // Each thread observes a deterministic distinct
172                    // value; the clock should end at the max.
173                    c.observe((tid as u64 + 1) * 1_000);
174                })
175            })
176            .collect();
177        for h in handles {
178            h.join().unwrap();
179        }
180        assert_eq!(clock.now(), THREADS as u64 * 1_000);
181    }
182}