sqlrite/mvcc/registry.rs
1//! [`ActiveTxRegistry`] — the live-transaction table that
2//! garbage collection consults to know which row versions are still
3//! possibly visible (Phase 11.2).
4//!
5//! Per [`docs/concurrent-writes-plan.md`](../../../docs/concurrent-writes-plan.md):
6//!
7//! > Versions whose `end` timestamp is older than the oldest active
8//! > reader's begin-timestamp are dead and may be reclaimed.
9//!
10//! The registry is the source of "oldest active reader's
11//! begin-timestamp" — [`ActiveTxRegistry::min_active_begin_ts`].
12//! Phase 11.6 (GC) reads it on every sweep; Phase 11.4 (commit
13//! validation) registers each `BEGIN CONCURRENT` transaction here
14//! and unregisters at COMMIT/ROLLBACK.
15//!
16//! The current shape uses a `Mutex<BTreeMap>` for simplicity. Two
17//! reasons not to over-engineer this for v0:
18//!
19//! 1. The map is only touched twice per transaction (begin +
20//! commit/rollback). Even a thousand concurrent writers hit a
21//! couple-thousand `lock` calls per second — well below mutex
22//! contention thresholds.
23//! 2. `min_active_begin_ts` is `O(log N)` on a `BTreeMap` (the
24//! smallest key is at `iter().next()`), which is fine for the
25//! "GC asks once per sweep" use case.
26//!
27//! When the GC profile shows the registry on the hot path, swap to
28//! a sharded skip list or an `RwLock`-protected sorted set. Until
29//! then this is sufficient.
30
31use std::collections::BTreeMap;
32use std::sync::{Arc, Mutex};
33
34use super::clock::MvccClock;
35
36/// Opaque transaction identifier. Newtype around a `u64` so a stray
37/// timestamp doesn't accidentally pass as a `TxId` and vice versa.
38/// Allocated by [`ActiveTxRegistry::register`].
39#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
40pub struct TxId(pub u64);
41
42impl std::fmt::Display for TxId {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 write!(f, "tx{}", self.0)
45 }
46}
47
48/// Tagged-union "this version is in-flight under transaction `id`"
49/// vs. "this version was committed at timestamp `ts`". Per the plan,
50/// row versions carry a `begin: TxTimestampOrId` so reads can ignore
51/// versions belonging to still-open transactions while still seeing
52/// the latest committed version.
53///
54/// Phase 11.4 will be the first consumer; defining it here keeps the
55/// type stable across the in-flight sub-phases so callers don't have
56/// to chase a moving target.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum TxTimestampOrId {
59 /// Committed timestamp — visible to any transaction whose
60 /// `begin_ts >= this`.
61 Timestamp(u64),
62 /// In-flight transaction id — invisible to every other reader
63 /// until the producing transaction commits and stamps its
64 /// versions with a timestamp.
65 Id(TxId),
66}
67
68/// Live-transaction table. Cheap to clone (internally `Arc`-wrapped
69/// state); pass clones into worker threads.
70#[derive(Clone, Debug, Default)]
71pub struct ActiveTxRegistry {
72 inner: Arc<Mutex<RegistryInner>>,
73}
74
75#[derive(Debug, Default)]
76struct RegistryInner {
77 /// `TxId` → `begin_ts`. Deterministic ordering by `TxId` (which
78 /// matches allocation order) — useful for diagnostic output.
79 by_id: BTreeMap<TxId, u64>,
80 /// Multiset of `begin_ts` values, with a count for each. Lets
81 /// `min_active_begin_ts` answer in `O(log N)` regardless of
82 /// `by_id`'s size, and `unregister` just decrements the relevant
83 /// counter rather than scanning. The shape matters once we have
84 /// many concurrent transactions all sharing the same begin_ts
85 /// (rare under MvccClock, but possible if a snapshot is taken
86 /// without ticking the clock).
87 by_ts: BTreeMap<u64, usize>,
88}
89
90impl ActiveTxRegistry {
91 /// Creates an empty registry. Equivalent to `Default::default()`.
92 pub fn new() -> Self {
93 Self::default()
94 }
95
96 /// Registers a new transaction. Allocates a fresh [`TxId`] from
97 /// `clock` and snapshots the current clock value as the
98 /// transaction's `begin_ts`.
99 ///
100 /// Returns a [`TxHandle`] — when the handle drops, the
101 /// transaction is automatically unregistered. RAII keeps the
102 /// "did the transaction's caller forget to clean up?" failure
103 /// mode out of the cold-path code.
104 pub fn register(&self, clock: &MvccClock) -> TxHandle {
105 let begin_ts = clock.tick();
106 let id = TxId(begin_ts);
107 let mut g = self.lock();
108 g.by_id.insert(id, begin_ts);
109 *g.by_ts.entry(begin_ts).or_insert(0) += 1;
110 drop(g);
111 TxHandle {
112 id,
113 begin_ts,
114 registry: self.clone(),
115 }
116 }
117
118 /// Returns the begin-timestamp of the oldest in-flight
119 /// transaction, or `None` when nothing is in flight. Phase 11.6
120 /// uses this to set the GC watermark — versions whose `end`
121 /// timestamp is strictly less than this value can never be seen
122 /// again and may be reclaimed.
123 pub fn min_active_begin_ts(&self) -> Option<u64> {
124 self.lock().by_ts.keys().next().copied()
125 }
126
127 /// Number of in-flight transactions. Cheap diagnostic accessor;
128 /// not load-bearing for correctness.
129 pub fn active_count(&self) -> usize {
130 self.lock().by_id.len()
131 }
132
133 /// Internal — dropped through [`TxHandle::drop`].
134 fn unregister(&self, id: TxId, begin_ts: u64) {
135 let mut g = self.lock();
136 g.by_id.remove(&id);
137 if let Some(slot) = g.by_ts.get_mut(&begin_ts) {
138 *slot = slot.saturating_sub(1);
139 if *slot == 0 {
140 g.by_ts.remove(&begin_ts);
141 }
142 }
143 }
144
145 fn lock(&self) -> std::sync::MutexGuard<'_, RegistryInner> {
146 self.inner
147 .lock()
148 .unwrap_or_else(|e| panic!("sqlrite: ActiveTxRegistry mutex poisoned: {e}"))
149 }
150}
151
152/// RAII guard returned by [`ActiveTxRegistry::register`]. Dropping it
153/// unregisters the transaction. A typical caller doesn't deal with it
154/// explicitly — it lives on the `ConcurrentTx` struct (Phase 11.4)
155/// and is dropped when the transaction commits or rolls back.
156#[derive(Debug)]
157pub struct TxHandle {
158 id: TxId,
159 begin_ts: u64,
160 registry: ActiveTxRegistry,
161}
162
163impl TxHandle {
164 /// The opaque identifier this transaction was allocated. Stable
165 /// for the handle's lifetime.
166 pub fn id(&self) -> TxId {
167 self.id
168 }
169
170 /// The timestamp at which this transaction's snapshot was taken.
171 /// Phase 11.3 reads use this as the visibility cutoff: a row
172 /// version with `begin <= self.begin_ts() < end` is the visible
173 /// one.
174 pub fn begin_ts(&self) -> u64 {
175 self.begin_ts
176 }
177}
178
179impl Drop for TxHandle {
180 fn drop(&mut self) {
181 self.registry.unregister(self.id, self.begin_ts);
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188
189 #[test]
190 fn empty_registry_has_no_minimum() {
191 let r = ActiveTxRegistry::new();
192 assert_eq!(r.min_active_begin_ts(), None);
193 assert_eq!(r.active_count(), 0);
194 }
195
196 #[test]
197 fn register_advances_clock_and_updates_minimum() {
198 let clock = MvccClock::new(0);
199 let r = ActiveTxRegistry::new();
200
201 let h1 = r.register(&clock);
202 assert_eq!(h1.begin_ts(), 1);
203 assert_eq!(r.min_active_begin_ts(), Some(1));
204
205 let h2 = r.register(&clock);
206 assert_eq!(h2.begin_ts(), 2);
207 assert_eq!(r.min_active_begin_ts(), Some(1));
208
209 // Closing the older transaction lifts the minimum.
210 drop(h1);
211 assert_eq!(r.min_active_begin_ts(), Some(2));
212
213 drop(h2);
214 assert_eq!(r.min_active_begin_ts(), None);
215 }
216
217 #[test]
218 fn handles_carry_distinct_ids_and_unique_timestamps() {
219 let clock = MvccClock::new(0);
220 let r = ActiveTxRegistry::new();
221 let h1 = r.register(&clock);
222 let h2 = r.register(&clock);
223 assert_ne!(h1.id(), h2.id());
224 assert_ne!(h1.begin_ts(), h2.begin_ts());
225 assert_eq!(r.active_count(), 2);
226 }
227
228 #[test]
229 fn unregister_in_arbitrary_order_keeps_minimum_correct() {
230 let clock = MvccClock::new(0);
231 let r = ActiveTxRegistry::new();
232 let h1 = r.register(&clock); // begin_ts = 1
233 let h2 = r.register(&clock); // begin_ts = 2
234 let h3 = r.register(&clock); // begin_ts = 3
235 assert_eq!(r.min_active_begin_ts(), Some(1));
236
237 // Drop the middle one — minimum still h1.
238 drop(h2);
239 assert_eq!(r.min_active_begin_ts(), Some(1));
240
241 // Drop the oldest — minimum jumps to h3.
242 drop(h1);
243 assert_eq!(r.min_active_begin_ts(), Some(3));
244
245 drop(h3);
246 assert_eq!(r.min_active_begin_ts(), None);
247 }
248
249 #[test]
250 fn registry_is_send_and_sync() {
251 // Compile-time check — required so the registry can be cloned
252 // into worker threads.
253 fn assert_send<T: Send>() {}
254 fn assert_sync<T: Sync>() {}
255 assert_send::<ActiveTxRegistry>();
256 assert_sync::<ActiveTxRegistry>();
257 assert_send::<TxHandle>();
258 assert_sync::<TxHandle>();
259 }
260
261 /// Many concurrent registrations — every begin_ts must be unique
262 /// and the registry's count must match the live handle count.
263 #[test]
264 fn concurrent_registrations_are_consistent() {
265 use std::thread;
266 const THREADS: usize = 8;
267 const PER_THREAD: usize = 100;
268
269 let clock = Arc::new(MvccClock::new(0));
270 let registry = ActiveTxRegistry::new();
271
272 let handles: Vec<_> = (0..THREADS)
273 .map(|_| {
274 let c = Arc::clone(&clock);
275 let r = registry.clone();
276 thread::spawn(move || {
277 let mut held: Vec<TxHandle> = Vec::with_capacity(PER_THREAD);
278 for _ in 0..PER_THREAD {
279 held.push(r.register(&c));
280 }
281 // Don't drop here — return the handles so the
282 // outer thread sees them all simultaneously alive.
283 held
284 })
285 })
286 .collect();
287
288 let mut all: Vec<TxHandle> = Vec::with_capacity(THREADS * PER_THREAD);
289 for h in handles {
290 all.extend(h.join().unwrap());
291 }
292
293 assert_eq!(registry.active_count(), THREADS * PER_THREAD);
294 let begins: std::collections::BTreeSet<u64> = all.iter().map(|h| h.begin_ts()).collect();
295 assert_eq!(
296 begins.len(),
297 THREADS * PER_THREAD,
298 "every concurrent registration must allocate a unique begin_ts"
299 );
300
301 // Drop every handle — registry empties out cleanly.
302 drop(all);
303 assert_eq!(registry.active_count(), 0);
304 assert_eq!(registry.min_active_begin_ts(), None);
305 }
306
307 #[test]
308 fn tx_id_displays_with_prefix() {
309 assert_eq!(format!("{}", TxId(7)), "tx7");
310 }
311
312 #[test]
313 fn tx_timestamp_or_id_round_trips() {
314 // Just exercise the Eq/Clone derives so a future refactor
315 // that changes the variants surfaces here.
316 let a = TxTimestampOrId::Timestamp(42);
317 let b = TxTimestampOrId::Id(TxId(42));
318 assert_ne!(a, b);
319 assert_eq!(a, a);
320 assert_eq!(b, b);
321 }
322}