sqlrite/mvcc/clock.rs
1//! [`MvccClock`] — the logical-clock primitive that hands out
2//! begin- and commit-timestamps for MVCC transactions (Phase 11.2).
3//!
4//! Per [`docs/concurrent-writes-plan.md`](../../../docs/concurrent-writes-plan.md):
5//!
6//! > A monotonic `u64` counter, per-`Database`. Hands out `begin_ts`
7//! > at `BEGIN CONCURRENT` and `commit_ts` at the start of validation.
8//! > Wrapped in `AtomicU64`; no contention because each transaction
9//! > calls it twice.
10//!
11//! The clock is persisted to the WAL header on each checkpoint so
12//! reopens resume past the highest committed timestamp — see
13//! [`crate::sql::pager::wal::WalHeader::clock_high_water`]. Without
14//! persistence, two transactions on either side of a reopen could
15//! receive the same timestamp and the snapshot-isolation visibility
16//! rule (`begin <= ts < end`) would mis-classify one of them.
17
18use std::sync::atomic::{AtomicU64, Ordering};
19
20/// Process-wide logical clock. Cheap to clone — internally an `Arc`
21/// over an [`AtomicU64`] in the [`Database`](crate::Database) wiring
22/// (added in Phase 11.3). Standalone today.
23#[derive(Debug, Default)]
24pub struct MvccClock {
25 counter: AtomicU64,
26}
27
28impl MvccClock {
29 /// Builds a clock seeded at `initial`. The next [`MvccClock::tick`]
30 /// returns `initial + 1`.
31 ///
32 /// Use this with the value persisted in the WAL header at open
33 /// time so the clock resumes past the last-checkpointed
34 /// high-water mark.
35 pub fn new(initial: u64) -> Self {
36 Self {
37 counter: AtomicU64::new(initial),
38 }
39 }
40
41 /// Returns the current high-water timestamp without advancing it.
42 /// Phase 11.6's GC reads this alongside
43 /// [`super::ActiveTxRegistry::min_active_begin_ts`] to decide
44 /// which row-version chains are reclaimable.
45 pub fn now(&self) -> u64 {
46 self.counter.load(Ordering::Acquire)
47 }
48
49 /// Bumps the clock by one and returns the new value. Strictly
50 /// monotonic: every call observes a distinct `u64`.
51 pub fn tick(&self) -> u64 {
52 // `fetch_add` returns the *previous* value — adjust to "after"
53 // semantics so callers see "the timestamp this call hands out".
54 // Wrap-around is impossible in practice (a billion ticks/s for
55 // 600 years still fits in `u64`), so saturating-add isn't
56 // needed.
57 self.counter.fetch_add(1, Ordering::AcqRel) + 1
58 }
59
60 /// Promotes the clock to at least `value`. No-op if `value` is at
61 /// or below the current high-water mark. Used at WAL replay to
62 /// bring the in-memory clock up to the persisted high-water
63 /// without an extra `tick()`.
64 pub fn observe(&self, value: u64) {
65 let mut current = self.counter.load(Ordering::Acquire);
66 while value > current {
67 // CAS rather than `store` — racing observers shouldn't
68 // step on each other and shouldn't move the clock
69 // backwards if a faster `tick` already overtook them.
70 match self.counter.compare_exchange_weak(
71 current,
72 value,
73 Ordering::AcqRel,
74 Ordering::Acquire,
75 ) {
76 Ok(_) => return,
77 Err(actual) => current = actual,
78 }
79 }
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 use super::*;
86 use std::sync::Arc;
87 use std::thread;
88
89 #[test]
90 fn new_seeds_the_counter() {
91 let c = MvccClock::new(42);
92 assert_eq!(c.now(), 42);
93 assert_eq!(c.tick(), 43);
94 assert_eq!(c.now(), 43);
95 }
96
97 #[test]
98 fn default_starts_at_zero() {
99 let c = MvccClock::default();
100 assert_eq!(c.now(), 0);
101 assert_eq!(c.tick(), 1);
102 }
103
104 #[test]
105 fn tick_is_strictly_monotonic_within_a_thread() {
106 let c = MvccClock::new(0);
107 let mut last = 0;
108 for _ in 0..1_000 {
109 let t = c.tick();
110 assert!(t > last, "tick went backwards: {t} after {last}");
111 last = t;
112 }
113 }
114
115 #[test]
116 fn observe_only_moves_forward() {
117 let c = MvccClock::new(100);
118 c.observe(50); // ignored — below current
119 assert_eq!(c.now(), 100);
120 c.observe(200);
121 assert_eq!(c.now(), 200);
122 c.observe(150); // ignored — below current
123 assert_eq!(c.now(), 200);
124 }
125
126 /// Concurrent ticks across N threads must hand out N × M distinct
127 /// values (no duplicates, no skipped values). This is the property
128 /// MVCC visibility relies on.
129 #[test]
130 fn ticks_are_unique_under_contention() {
131 const THREADS: usize = 8;
132 const PER_THREAD: usize = 250;
133 let clock = Arc::new(MvccClock::new(0));
134
135 let handles: Vec<_> = (0..THREADS)
136 .map(|_| {
137 let c = Arc::clone(&clock);
138 thread::spawn(move || {
139 let mut out = Vec::with_capacity(PER_THREAD);
140 for _ in 0..PER_THREAD {
141 out.push(c.tick());
142 }
143 out
144 })
145 })
146 .collect();
147
148 let mut all = Vec::with_capacity(THREADS * PER_THREAD);
149 for h in handles {
150 all.extend(h.join().unwrap());
151 }
152 all.sort_unstable();
153 // No duplicates.
154 for w in all.windows(2) {
155 assert_ne!(w[0], w[1], "duplicate timestamp {}", w[0]);
156 }
157 // Range is contiguous 1..=THREADS*PER_THREAD (clock seeded at 0).
158 assert_eq!(all.first().copied(), Some(1));
159 assert_eq!(all.last().copied(), Some((THREADS * PER_THREAD) as u64));
160 }
161
162 /// Concurrent `observe`s must not move the clock backwards.
163 #[test]
164 fn observe_under_contention_never_regresses() {
165 const THREADS: usize = 8;
166 let clock = Arc::new(MvccClock::new(0));
167 let handles: Vec<_> = (0..THREADS)
168 .map(|tid| {
169 let c = Arc::clone(&clock);
170 thread::spawn(move || {
171 // Each thread observes a deterministic distinct
172 // value; the clock should end at the max.
173 c.observe((tid as u64 + 1) * 1_000);
174 })
175 })
176 .collect();
177 for h in handles {
178 h.join().unwrap();
179 }
180 assert_eq!(clock.now(), THREADS as u64 * 1_000);
181 }
182}