Skip to main content

sqlrite/mvcc/
registry.rs

1//! [`ActiveTxRegistry`] — the live-transaction table that
2//! garbage collection consults to know which row versions are still
3//! possibly visible (Phase 11.2).
4//!
5//! Per [`docs/concurrent-writes-plan.md`](../../../docs/concurrent-writes-plan.md):
6//!
7//! > Versions whose `end` timestamp is older than the oldest active
8//! > reader's begin-timestamp are dead and may be reclaimed.
9//!
10//! The registry is the source of "oldest active reader's
11//! begin-timestamp" — [`ActiveTxRegistry::min_active_begin_ts`].
12//! Phase 11.6 (GC) reads it on every sweep; Phase 11.4 (commit
13//! validation) registers each `BEGIN CONCURRENT` transaction here
14//! and unregisters at COMMIT/ROLLBACK.
15//!
16//! The current shape uses a `Mutex<BTreeMap>` for simplicity. Two
17//! reasons not to over-engineer this for v0:
18//!
19//! 1. The map is only touched twice per transaction (begin +
20//!    commit/rollback). Even a thousand concurrent writers hit a
21//!    couple-thousand `lock` calls per second — well below mutex
22//!    contention thresholds.
23//! 2. `min_active_begin_ts` is `O(log N)` on a `BTreeMap` (the
24//!    smallest key is at `iter().next()`), which is fine for the
25//!    "GC asks once per sweep" use case.
26//!
27//! When the GC profile shows the registry on the hot path, swap to
28//! a sharded skip list or an `RwLock`-protected sorted set. Until
29//! then this is sufficient.
30
31use std::collections::BTreeMap;
32use std::sync::{Arc, Mutex};
33
34use super::clock::MvccClock;
35
36/// Opaque transaction identifier. Newtype around a `u64` so a stray
37/// timestamp doesn't accidentally pass as a `TxId` and vice versa.
38/// Allocated by [`ActiveTxRegistry::register`].
39#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
40pub struct TxId(pub u64);
41
42impl std::fmt::Display for TxId {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        write!(f, "tx{}", self.0)
45    }
46}
47
48/// Tagged-union "this version is in-flight under transaction `id`"
49/// vs. "this version was committed at timestamp `ts`". Per the plan,
50/// row versions carry a `begin: TxTimestampOrId` so reads can ignore
51/// versions belonging to still-open transactions while still seeing
52/// the latest committed version.
53///
54/// Phase 11.4 will be the first consumer; defining it here keeps the
55/// type stable across the in-flight sub-phases so callers don't have
56/// to chase a moving target.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum TxTimestampOrId {
59    /// Committed timestamp — visible to any transaction whose
60    /// `begin_ts >= this`.
61    Timestamp(u64),
62    /// In-flight transaction id — invisible to every other reader
63    /// until the producing transaction commits and stamps its
64    /// versions with a timestamp.
65    Id(TxId),
66}
67
68/// Live-transaction table. Cheap to clone (internally `Arc`-wrapped
69/// state); pass clones into worker threads.
70#[derive(Clone, Debug, Default)]
71pub struct ActiveTxRegistry {
72    inner: Arc<Mutex<RegistryInner>>,
73}
74
75#[derive(Debug, Default)]
76struct RegistryInner {
77    /// `TxId` → `begin_ts`. Deterministic ordering by `TxId` (which
78    /// matches allocation order) — useful for diagnostic output.
79    by_id: BTreeMap<TxId, u64>,
80    /// Multiset of `begin_ts` values, with a count for each. Lets
81    /// `min_active_begin_ts` answer in `O(log N)` regardless of
82    /// `by_id`'s size, and `unregister` just decrements the relevant
83    /// counter rather than scanning. The shape matters once we have
84    /// many concurrent transactions all sharing the same begin_ts
85    /// (rare under MvccClock, but possible if a snapshot is taken
86    /// without ticking the clock).
87    by_ts: BTreeMap<u64, usize>,
88}
89
90impl ActiveTxRegistry {
91    /// Creates an empty registry. Equivalent to `Default::default()`.
92    pub fn new() -> Self {
93        Self::default()
94    }
95
96    /// Registers a new transaction. Allocates a fresh [`TxId`] from
97    /// `clock` and snapshots the current clock value as the
98    /// transaction's `begin_ts`.
99    ///
100    /// Returns a [`TxHandle`] — when the handle drops, the
101    /// transaction is automatically unregistered. RAII keeps the
102    /// "did the transaction's caller forget to clean up?" failure
103    /// mode out of the cold-path code.
104    pub fn register(&self, clock: &MvccClock) -> TxHandle {
105        let begin_ts = clock.tick();
106        let id = TxId(begin_ts);
107        let mut g = self.lock();
108        g.by_id.insert(id, begin_ts);
109        *g.by_ts.entry(begin_ts).or_insert(0) += 1;
110        drop(g);
111        TxHandle {
112            id,
113            begin_ts,
114            registry: self.clone(),
115        }
116    }
117
118    /// Returns the begin-timestamp of the oldest in-flight
119    /// transaction, or `None` when nothing is in flight. Phase 11.6
120    /// uses this to set the GC watermark — versions whose `end`
121    /// timestamp is strictly less than this value can never be seen
122    /// again and may be reclaimed.
123    pub fn min_active_begin_ts(&self) -> Option<u64> {
124        self.lock().by_ts.keys().next().copied()
125    }
126
127    /// Number of in-flight transactions. Cheap diagnostic accessor;
128    /// not load-bearing for correctness.
129    pub fn active_count(&self) -> usize {
130        self.lock().by_id.len()
131    }
132
133    /// Internal — dropped through [`TxHandle::drop`].
134    fn unregister(&self, id: TxId, begin_ts: u64) {
135        let mut g = self.lock();
136        g.by_id.remove(&id);
137        if let Some(slot) = g.by_ts.get_mut(&begin_ts) {
138            *slot = slot.saturating_sub(1);
139            if *slot == 0 {
140                g.by_ts.remove(&begin_ts);
141            }
142        }
143    }
144
145    fn lock(&self) -> std::sync::MutexGuard<'_, RegistryInner> {
146        self.inner
147            .lock()
148            .unwrap_or_else(|e| panic!("sqlrite: ActiveTxRegistry mutex poisoned: {e}"))
149    }
150}
151
152/// RAII guard returned by [`ActiveTxRegistry::register`]. Dropping it
153/// unregisters the transaction. A typical caller doesn't deal with it
154/// explicitly — it lives on the `ConcurrentTx` struct (Phase 11.4)
155/// and is dropped when the transaction commits or rolls back.
156#[derive(Debug)]
157pub struct TxHandle {
158    id: TxId,
159    begin_ts: u64,
160    registry: ActiveTxRegistry,
161}
162
163impl TxHandle {
164    /// The opaque identifier this transaction was allocated. Stable
165    /// for the handle's lifetime.
166    pub fn id(&self) -> TxId {
167        self.id
168    }
169
170    /// The timestamp at which this transaction's snapshot was taken.
171    /// Phase 11.3 reads use this as the visibility cutoff: a row
172    /// version with `begin <= self.begin_ts() < end` is the visible
173    /// one.
174    pub fn begin_ts(&self) -> u64 {
175        self.begin_ts
176    }
177}
178
179impl Drop for TxHandle {
180    fn drop(&mut self) {
181        self.registry.unregister(self.id, self.begin_ts);
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188
189    #[test]
190    fn empty_registry_has_no_minimum() {
191        let r = ActiveTxRegistry::new();
192        assert_eq!(r.min_active_begin_ts(), None);
193        assert_eq!(r.active_count(), 0);
194    }
195
196    #[test]
197    fn register_advances_clock_and_updates_minimum() {
198        let clock = MvccClock::new(0);
199        let r = ActiveTxRegistry::new();
200
201        let h1 = r.register(&clock);
202        assert_eq!(h1.begin_ts(), 1);
203        assert_eq!(r.min_active_begin_ts(), Some(1));
204
205        let h2 = r.register(&clock);
206        assert_eq!(h2.begin_ts(), 2);
207        assert_eq!(r.min_active_begin_ts(), Some(1));
208
209        // Closing the older transaction lifts the minimum.
210        drop(h1);
211        assert_eq!(r.min_active_begin_ts(), Some(2));
212
213        drop(h2);
214        assert_eq!(r.min_active_begin_ts(), None);
215    }
216
217    #[test]
218    fn handles_carry_distinct_ids_and_unique_timestamps() {
219        let clock = MvccClock::new(0);
220        let r = ActiveTxRegistry::new();
221        let h1 = r.register(&clock);
222        let h2 = r.register(&clock);
223        assert_ne!(h1.id(), h2.id());
224        assert_ne!(h1.begin_ts(), h2.begin_ts());
225        assert_eq!(r.active_count(), 2);
226    }
227
228    #[test]
229    fn unregister_in_arbitrary_order_keeps_minimum_correct() {
230        let clock = MvccClock::new(0);
231        let r = ActiveTxRegistry::new();
232        let h1 = r.register(&clock); // begin_ts = 1
233        let h2 = r.register(&clock); // begin_ts = 2
234        let h3 = r.register(&clock); // begin_ts = 3
235        assert_eq!(r.min_active_begin_ts(), Some(1));
236
237        // Drop the middle one — minimum still h1.
238        drop(h2);
239        assert_eq!(r.min_active_begin_ts(), Some(1));
240
241        // Drop the oldest — minimum jumps to h3.
242        drop(h1);
243        assert_eq!(r.min_active_begin_ts(), Some(3));
244
245        drop(h3);
246        assert_eq!(r.min_active_begin_ts(), None);
247    }
248
249    #[test]
250    fn registry_is_send_and_sync() {
251        // Compile-time check — required so the registry can be cloned
252        // into worker threads.
253        fn assert_send<T: Send>() {}
254        fn assert_sync<T: Sync>() {}
255        assert_send::<ActiveTxRegistry>();
256        assert_sync::<ActiveTxRegistry>();
257        assert_send::<TxHandle>();
258        assert_sync::<TxHandle>();
259    }
260
261    /// Many concurrent registrations — every begin_ts must be unique
262    /// and the registry's count must match the live handle count.
263    #[test]
264    fn concurrent_registrations_are_consistent() {
265        use std::thread;
266        const THREADS: usize = 8;
267        const PER_THREAD: usize = 100;
268
269        let clock = Arc::new(MvccClock::new(0));
270        let registry = ActiveTxRegistry::new();
271
272        let handles: Vec<_> = (0..THREADS)
273            .map(|_| {
274                let c = Arc::clone(&clock);
275                let r = registry.clone();
276                thread::spawn(move || {
277                    let mut held: Vec<TxHandle> = Vec::with_capacity(PER_THREAD);
278                    for _ in 0..PER_THREAD {
279                        held.push(r.register(&c));
280                    }
281                    // Don't drop here — return the handles so the
282                    // outer thread sees them all simultaneously alive.
283                    held
284                })
285            })
286            .collect();
287
288        let mut all: Vec<TxHandle> = Vec::with_capacity(THREADS * PER_THREAD);
289        for h in handles {
290            all.extend(h.join().unwrap());
291        }
292
293        assert_eq!(registry.active_count(), THREADS * PER_THREAD);
294        let begins: std::collections::BTreeSet<u64> = all.iter().map(|h| h.begin_ts()).collect();
295        assert_eq!(
296            begins.len(),
297            THREADS * PER_THREAD,
298            "every concurrent registration must allocate a unique begin_ts"
299        );
300
301        // Drop every handle — registry empties out cleanly.
302        drop(all);
303        assert_eq!(registry.active_count(), 0);
304        assert_eq!(registry.min_active_begin_ts(), None);
305    }
306
307    #[test]
308    fn tx_id_displays_with_prefix() {
309        assert_eq!(format!("{}", TxId(7)), "tx7");
310    }
311
312    #[test]
313    fn tx_timestamp_or_id_round_trips() {
314        // Just exercise the Eq/Clone derives so a future refactor
315        // that changes the variants surfaces here.
316        let a = TxTimestampOrId::Timestamp(42);
317        let b = TxTimestampOrId::Id(TxId(42));
318        assert_ne!(a, b);
319        assert_eq!(a, a);
320        assert_eq!(b, b);
321    }
322}