Skip to main content

reddb_server/storage/transaction/
snapshot.rs

1//! MVCC Snapshot Manager (Phase 2.3 PG parity)
2//!
3//! Allocates monotonic transaction IDs ("xids") and tracks the set of
4//! currently-active transactions. Powers the visibility rule used by
5//! `UnifiedEntity::is_visible`:
6//!
7//! ```text
8//! xmin == 0 || xmin <= snapshot.xid    AND   xmax == 0 || xmax > snapshot.xid
9//! ```
10//!
11//! For Phase 2.3 the manager is in-process only — no WAL logging of xids,
12//! no crash recovery of in-flight transactions. Committed rows become
13//! permanently visible because their `xmin` is ≤ every future snapshot.
14//! Rolled-back rows keep their `xmin` but are flagged via the
15//! `aborted_xids` set, which `is_visible` can consult. Phase 2.3.2 adds
16//! the WAL integration; Phase 4 adds full ACID recovery.
17//!
18//! # Isolation levels
19//!
20//! * `ReadCommitted` — each statement takes a fresh snapshot. Good enough
21//!   for most OLTP; supports non-repeatable reads across statements.
22//! * `SnapshotIsolation` — one snapshot per transaction. No read skew
23//!   within a transaction; writes conflict on first-committer-wins.
24//! * `Serializable` — stricter conflict detection (predicate locks). Not
25//!   implemented in Phase 2.3; resolver accepts the mode but downgrades
26//!   to SnapshotIsolation semantics with a logged warning.
27
28use std::collections::{HashMap, HashSet};
29use std::sync::atomic::{AtomicU64, Ordering};
30
31use super::coordinator::IsolationLevel;
32
33/// Default autocommit-xid pool batch size. Each refill reserves this
34/// many xids in a single `fetch_add` so back-to-back autocommit inserts
35/// share one atomic op instead of paying it per row. Sized small to
36/// keep a pristine `peek_next_xid()` close to the truth — VACUUM and
37/// diagnostics treat reserved-but-unused xids as already-committed.
38pub(crate) const AUTOCOMMIT_POOL_BATCH: u64 = 16;
39
40/// A transaction identifier. Monotonic across the lifetime of the process.
41pub type Xid = u64;
42
43/// Reserved xid meaning "not inside a transaction" — pre-MVCC rows stamp
44/// this value so they stay visible to every snapshot.
45pub const XID_NONE: Xid = 0;
46
47/// Immutable snapshot taken at transaction start or statement start.
48///
49/// Callers evaluate `UnifiedEntity::is_visible(snapshot.xid)` on every
50/// row returned from storage to filter out rows created by concurrent
51/// transactions that hadn't committed when the snapshot was taken.
52#[derive(Debug, Clone)]
53pub struct Snapshot {
54    /// The snapshot's xid — every row with `xmin <= xid` created before
55    /// the snapshot is visible (assuming `xmax` hasn't passed).
56    pub xid: Xid,
57    /// Transactions that were still active when the snapshot was taken.
58    /// Their writes must be *hidden* even when `xmin <= xid`, because
59    /// the writer hadn't committed yet from this snapshot's point of view.
60    pub in_progress: HashSet<Xid>,
61}
62
63impl Snapshot {
64    /// Is a row with this xmin/xmax visible under this snapshot?
65    ///
66    /// Delegates to [`super::visibility::is_visible`] — the deep
67    /// module that owns the full MVCC visibility predicate. The
68    /// `aborted` argument is empty here because `Snapshot` does not
69    /// carry the manager-level aborted set; callers that need the
70    /// rolled-back-writer rule should consult [`SnapshotManager`]
71    /// directly, or evolve `Snapshot` to embed an aborted view.
72    pub fn sees(&self, xmin: Xid, xmax: Xid) -> bool {
73        super::visibility::is_visible(xmin, xmax, self.xid, &self.in_progress, &HashSet::new())
74    }
75}
76
77/// Per-transaction state tracked on the runtime while BEGIN/COMMIT/ROLLBACK
78/// is active. Attached to a connection via `RuntimeInner::tx_contexts`.
79#[derive(Debug, Clone)]
80pub struct TxnContext {
81    pub xid: Xid,
82    pub isolation: IsolationLevel,
83    /// Snapshot captured at BEGIN (SnapshotIsolation / Serializable) or
84    /// refreshed per-statement (ReadCommitted).
85    pub snapshot: Snapshot,
86    /// Ordered list of `(savepoint_name, sub_xid)` entries (Phase
87    /// 2.3.2e savepoints). Each SAVEPOINT pushes a freshly-allocated
88    /// xid onto this stack; writes stamp xmin/xmax with the top entry
89    /// so ROLLBACK TO SAVEPOINT can mark only those writes as aborted.
90    /// RELEASE SAVEPOINT pops the named level plus everything above it
91    /// without aborting — the sub-xids keep their effects and commit
92    /// together with the parent. Empty stack means "writes use `xid`
93    /// directly", matching pre-savepoint behaviour.
94    pub savepoints: Vec<(String, Xid)>,
95    /// Sub-xids popped by `RELEASE SAVEPOINT` that should still commit
96    /// alongside the parent. PG semantics: released subtxns keep their
97    /// writes — they're promoted to parent-visible at COMMIT. Stored
98    /// separately from `savepoints` so their names are gone (cannot be
99    /// rolled back or released again) while their xids remain trackable.
100    pub released_sub_xids: Vec<Xid>,
101}
102
103impl TxnContext {
104    /// Xid new writes in this connection should stamp onto tuples — the
105    /// innermost open savepoint, or the parent xid when no savepoint is
106    /// active.
107    pub fn writer_xid(&self) -> Xid {
108        self.savepoints.last().map(|(_, x)| *x).unwrap_or(self.xid)
109    }
110}
111
112/// Central allocator and liveness tracker.
113///
114/// Uses an atomic counter for xid allocation and a parking_lot-guarded
115/// HashSet for in-progress/aborted bookkeeping. The sets stay small —
116/// only unfinished transactions plus a finite rollback history — so a
117/// plain HashSet outperforms more complex data structures here.
118///
119/// # Autocommit xid pool
120///
121/// Single-row autocommit writes (`MutationEngine::append_one`) need an
122/// xid that's "born committed" — they call `begin()` then `commit()`
123/// back-to-back before the row is even durable. The pre-commit pool
124/// (`autocommit_pool_*`) batches the reservation: one
125/// `next_xid.fetch_add(BATCH)` reserves a contiguous range of xids,
126/// each handed out via a single atomic without touching the
127/// `RwLock<ManagerState>`. Pool xids are never inserted into `active`
128/// or `aborted` so they look like already-committed transactions to
129/// every snapshot — identical visibility semantics to the legacy
130/// `begin()/commit()` pair (which also leaves the xid in neither set).
131pub struct SnapshotManager {
132    next_xid: AtomicU64,
133    state: parking_lot::RwLock<ManagerState>,
134    /// Reservation window for the autocommit pool. A single
135    /// `parking_lot::Mutex` protects two `u64`s — `next` (next xid to
136    /// hand out) and `end` (exclusive upper bound). When `next == end`
137    /// the next caller refills by reserving `AUTOCOMMIT_POOL_BATCH`
138    /// xids in a single `next_xid.fetch_add`, dropping the lock cost
139    /// from one acquire-per-xid (the legacy `begin()`+`commit()` pair)
140    /// to one acquire-per-`AUTOCOMMIT_POOL_BATCH` xids. A plain Mutex
141    /// is enough here — the critical section is two stores and an
142    /// atomic add, and contention is bounded by the writer count.
143    autocommit_pool: parking_lot::Mutex<AutocommitPool>,
144}
145
146#[derive(Default)]
147struct AutocommitPool {
148    next: Xid,
149    end: Xid,
150}
151
152#[derive(Default)]
153struct ManagerState {
154    /// xids that have started but not yet committed/rolled back.
155    active: HashSet<Xid>,
156    /// xids that rolled back. `is_visible` MUST treat these as invisible
157    /// (the writer never committed). The set is pruned lazily by VACUUM.
158    aborted: HashSet<Xid>,
159    /// xids that must NOT be reclaimed by VACUUM because some higher-level
160    /// object (a VCS commit, a long-lived replica snapshot) still points
161    /// at them. Reference-counted so multiple pins coexist; decrementing
162    /// to zero removes the entry. `prune_aborted` skips any xid present
163    /// here so its row versions stay readable.
164    pinned: HashMap<Xid, u32>,
165}
166
167impl SnapshotManager {
168    pub fn new() -> Self {
169        Self {
170            // Start at 1 so xid=0 keeps its pre-MVCC "everyone sees it" meaning.
171            next_xid: AtomicU64::new(1),
172            state: parking_lot::RwLock::new(ManagerState::default()),
173            // Pool starts empty — first caller triggers a refill.
174            autocommit_pool: parking_lot::Mutex::new(AutocommitPool::default()),
175        }
176    }
177
178    /// Allocate a new xid and mark it active. Returns the xid for
179    /// stamping onto `UnifiedEntity::xmin/xmax`.
180    pub fn begin(&self) -> Xid {
181        let xid = self.next_xid.fetch_add(1, Ordering::Relaxed);
182        self.state.write().active.insert(xid);
183        xid
184    }
185
186    /// Capture a point-in-time snapshot. Must be called after `begin()`
187    /// when using SnapshotIsolation/Serializable. ReadCommitted refreshes
188    /// this per statement via the same call.
189    pub fn snapshot(&self, xid: Xid) -> Snapshot {
190        let state = self.state.read();
191        // Active xids other than our own appear as "in-progress" to us.
192        let in_progress: HashSet<Xid> =
193            state.active.iter().copied().filter(|&x| x != xid).collect();
194        Snapshot { xid, in_progress }
195    }
196
197    /// Mark a transaction as committed. Its writes become visible to
198    /// future snapshots; earlier snapshots keep their own view.
199    pub fn commit(&self, xid: Xid) {
200        let mut state = self.state.write();
201        state.active.remove(&xid);
202        // Also clear from aborted set in case of prior rollback_to call
203        // that touched this xid (defensive; normally a no-op).
204        state.aborted.remove(&xid);
205    }
206
207    /// Allocate an xid that is *born committed* — for autocommit
208    /// callers (`MutationEngine::append_one`) that previously paid two
209    /// `state.write()` lock acquisitions per row to insert-then-remove
210    /// from the active set.
211    ///
212    /// The returned xid is never inserted into `active` and never into
213    /// `aborted`, which matches the steady state of the legacy
214    /// `begin()/commit()` pair when called back-to-back: the xid leaves
215    /// the manager's tracking sets unobservably. Concurrent readers
216    /// therefore see it as an already-committed transaction once
217    /// `xmin <= snapshot.xid`, which is exactly the semantics the
218    /// autocommit path needs.
219    ///
220    /// Implementation: a small reservation pool (`AUTOCOMMIT_POOL_BATCH`
221    /// xids) is reserved with one `fetch_add`. Each caller hands itself
222    /// the next xid via a single atomic. When the pool drains, the
223    /// next caller serialises briefly through `autocommit_pool_refill`
224    /// to bump the window, then falls back into the lock-free hot path.
225    ///
226    /// Durability note: this method does NOT make the row durable —
227    /// it only allocates the identifier. The caller must complete the
228    /// usual WAL-append + fsync cycle before acknowledging the write.
229    /// Pre-allocating the xid is safe because the xid carries no
230    /// promise that any row exists; it's just a number for `xmin`.
231    pub fn allocate_committed_xid(&self) -> Xid {
232        let mut pool = self.autocommit_pool.lock();
233        if pool.next >= pool.end {
234            // Reserve the next contiguous range. A single
235            // `fetch_add(BATCH)` on the global counter — equivalent to
236            // BATCH back-to-back `begin()` calls in terms of xid
237            // numbering, but with zero `state.write()` traffic.
238            let start = self
239                .next_xid
240                .fetch_add(AUTOCOMMIT_POOL_BATCH, Ordering::Relaxed);
241            pool.next = start;
242            pool.end = start + AUTOCOMMIT_POOL_BATCH;
243        }
244        let xid = pool.next;
245        pool.next += 1;
246        xid
247    }
248
249    /// Mark a transaction as rolled back. Its writes MUST stay hidden
250    /// from every future read — `is_visible` consults the aborted set
251    /// before honouring a row's `xmin`.
252    pub fn rollback(&self, xid: Xid) {
253        let mut state = self.state.write();
254        state.active.remove(&xid);
255        state.aborted.insert(xid);
256    }
257
258    /// Is this xid known to have rolled back? Called by the read path to
259    /// skip tuples whose creator never committed.
260    pub fn is_aborted(&self, xid: Xid) -> bool {
261        self.state.read().aborted.contains(&xid)
262    }
263
264    /// Is this xid still active?
265    pub fn is_active(&self, xid: Xid) -> bool {
266        self.state.read().active.contains(&xid)
267    }
268
269    /// Snapshot of every still-active xid (for VACUUM oldest-active-xid
270    /// calculation — any row with `xmax < min(active)` is reclaimable).
271    pub fn oldest_active_xid(&self) -> Option<Xid> {
272        self.state.read().active.iter().copied().min()
273    }
274
275    /// Oldest externally pinned xid. Pinned snapshots behave like active
276    /// snapshots for VACUUM: any tuple visible to that xid must survive even
277    /// when no SQL transaction is currently active.
278    pub fn oldest_pinned_xid(&self) -> Option<Xid> {
279        self.state.read().pinned.keys().copied().min()
280    }
281
282    /// Return the next xid that would be allocated. Useful for diagnostics
283    /// and for VACUUM to know the upper bound of aborted-xid retention.
284    pub fn peek_next_xid(&self) -> Xid {
285        self.next_xid.load(Ordering::Relaxed)
286    }
287
288    /// Advance the allocator so future snapshots consider an xid
289    /// recovered from storage/WAL to be in the committed past.
290    pub fn observe_committed_xid(&self, xid: Xid) {
291        if xid == XID_NONE {
292            return;
293        }
294        let target = xid.saturating_add(1);
295        let mut current = self.next_xid.load(Ordering::Relaxed);
296        while current < target {
297            match self.next_xid.compare_exchange(
298                current,
299                target,
300                Ordering::Relaxed,
301                Ordering::Relaxed,
302            ) {
303                Ok(_) => return,
304                Err(actual) => current = actual,
305            }
306        }
307    }
308
309    /// Prune the aborted-xid set. Safe to call once every aborted xid is
310    /// below `oldest_active`, which guarantees no live snapshot depends
311    /// on the distinction between "aborted" and "never existed". Pinned
312    /// xids are always retained so higher-level references (VCS commits,
313    /// replica snapshots) stay readable.
314    pub fn prune_aborted(&self, below: Xid) {
315        let mut state = self.state.write();
316        let ManagerState {
317            aborted, pinned, ..
318        } = &mut *state;
319        aborted.retain(|&x| x >= below || pinned.contains_key(&x));
320    }
321
322    /// Pin an xid so its row versions stay reclaim-safe across VACUUM.
323    /// Reference-counted — call `unpin` once per `pin` to release.
324    pub fn pin(&self, xid: Xid) {
325        if xid == XID_NONE {
326            return;
327        }
328        let mut state = self.state.write();
329        *state.pinned.entry(xid).or_insert(0) += 1;
330    }
331
332    /// Decrement an xid's pin count. At zero it is removed and becomes
333    /// VACUUM-eligible again. No-op if the xid was never pinned.
334    pub fn unpin(&self, xid: Xid) {
335        if xid == XID_NONE {
336            return;
337        }
338        let mut state = self.state.write();
339        if let Some(count) = state.pinned.get_mut(&xid) {
340            if *count <= 1 {
341                state.pinned.remove(&xid);
342            } else {
343                *count -= 1;
344            }
345        }
346    }
347
348    /// Is this xid currently pinned?
349    pub fn is_pinned(&self, xid: Xid) -> bool {
350        self.state.read().pinned.contains_key(&xid)
351    }
352
353    /// Current pin count for an xid (0 if not pinned). Diagnostic only.
354    pub fn pin_count(&self, xid: Xid) -> u32 {
355        self.state.read().pinned.get(&xid).copied().unwrap_or(0)
356    }
357}
358
359impl Default for SnapshotManager {
360    fn default() -> Self {
361        Self::new()
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368
369    #[test]
370    fn xids_are_monotonic() {
371        let m = SnapshotManager::new();
372        let a = m.begin();
373        let b = m.begin();
374        let c = m.begin();
375        assert!(a < b && b < c);
376    }
377
378    #[test]
379    fn snapshot_excludes_concurrent_writers() {
380        let m = SnapshotManager::new();
381        let writer = m.begin();
382        let reader = m.begin();
383        let snap = m.snapshot(reader);
384        // Writer is active from reader's perspective → in_progress set.
385        assert!(snap.in_progress.contains(&writer));
386        // A row written by `writer` with xmin=writer must be invisible.
387        assert!(!snap.sees(writer, XID_NONE));
388    }
389
390    #[test]
391    fn committed_rows_become_visible() {
392        let m = SnapshotManager::new();
393        let writer = m.begin();
394        m.commit(writer);
395        let reader = m.begin();
396        let snap = m.snapshot(reader);
397        // Row stamped with writer's xid is now visible (writer < reader & committed).
398        assert!(snap.sees(writer, XID_NONE));
399    }
400
401    #[test]
402    fn rolled_back_writers_stay_hidden() {
403        let m = SnapshotManager::new();
404        let writer = m.begin();
405        m.rollback(writer);
406        assert!(m.is_aborted(writer));
407        // Future callers skip tuples with xmin == writer by also consulting is_aborted.
408    }
409
410    #[test]
411    fn pre_mvcc_rows_always_visible() {
412        let m = SnapshotManager::new();
413        let reader = m.begin();
414        let snap = m.snapshot(reader);
415        assert!(snap.sees(XID_NONE, XID_NONE));
416    }
417
418    #[test]
419    fn deletion_xmax_respected() {
420        let m = SnapshotManager::new();
421        let creator = m.begin();
422        m.commit(creator);
423        let deleter = m.begin();
424        m.commit(deleter);
425        let reader = m.begin();
426        let snap = m.snapshot(reader);
427        // Reader opens *after* delete → row must be hidden.
428        assert!(!snap.sees(creator, deleter));
429    }
430
431    #[test]
432    fn pin_blocks_prune_of_aborted_xid() {
433        let m = SnapshotManager::new();
434        let writer = m.begin();
435        m.rollback(writer);
436        assert!(m.is_aborted(writer));
437        m.pin(writer);
438        // Even with a high watermark, pinned xid survives prune.
439        m.prune_aborted(writer + 1);
440        assert!(m.is_aborted(writer));
441        m.unpin(writer);
442        m.prune_aborted(writer + 1);
443        assert!(!m.is_aborted(writer));
444    }
445
446    #[test]
447    fn pin_is_reference_counted() {
448        let m = SnapshotManager::new();
449        let x = m.begin();
450        m.pin(x);
451        m.pin(x);
452        assert_eq!(m.pin_count(x), 2);
453        m.unpin(x);
454        assert_eq!(m.pin_count(x), 1);
455        assert!(m.is_pinned(x));
456        m.unpin(x);
457        assert_eq!(m.pin_count(x), 0);
458        assert!(!m.is_pinned(x));
459        // Extra unpin is a no-op.
460        m.unpin(x);
461        assert_eq!(m.pin_count(x), 0);
462    }
463
464    #[test]
465    fn pin_xid_none_is_noop() {
466        let m = SnapshotManager::new();
467        m.pin(XID_NONE);
468        assert!(!m.is_pinned(XID_NONE));
469        assert_eq!(m.pin_count(XID_NONE), 0);
470    }
471
472    #[test]
473    fn allocate_committed_xid_is_monotonic_and_unique() {
474        let m = SnapshotManager::new();
475        let mut seen = HashSet::new();
476        let mut last = 0u64;
477        // Drive at least three pool refills (BATCH=16 → 50 covers it).
478        for _ in 0..50 {
479            let x = m.allocate_committed_xid();
480            assert!(x > last, "xids must be strictly increasing: {x} > {last}");
481            assert!(seen.insert(x), "duplicate xid handed out: {x}");
482            last = x;
483        }
484    }
485
486    #[test]
487    fn allocate_committed_xid_skips_active_set() {
488        let m = SnapshotManager::new();
489        let _x = m.allocate_committed_xid();
490        // Pool xids must never appear in the active set — they are
491        // born committed. `oldest_active_xid` reflects only `begin()`
492        // callers (real BEGIN-wrapped transactions).
493        assert_eq!(m.oldest_active_xid(), None);
494    }
495
496    #[test]
497    fn allocate_committed_xid_visible_to_subsequent_snapshots() {
498        let m = SnapshotManager::new();
499        let writer = m.allocate_committed_xid();
500        let reader = m.begin();
501        let snap = m.snapshot(reader);
502        // Pool xid must be invisible to in_progress/aborted (it's in
503        // neither) and visible because writer < reader. This matches
504        // the legacy begin()+commit() pair's visibility exactly.
505        assert!(!snap.in_progress.contains(&writer));
506        assert!(!m.is_aborted(writer));
507        assert!(snap.sees(writer, XID_NONE));
508    }
509
510    #[test]
511    fn allocate_committed_xid_does_not_block_concurrent_begin() {
512        // Smoke test: an open BEGIN-wrapped tx coexists with pool
513        // allocation; pool xids end up between the begin and commit
514        // without being added to `active`.
515        let m = SnapshotManager::new();
516        let tx = m.begin();
517        let auto1 = m.allocate_committed_xid();
518        let auto2 = m.allocate_committed_xid();
519        m.commit(tx);
520        assert!(tx < auto1 && auto1 < auto2);
521        // `active` should be empty after commit.
522        assert_eq!(m.oldest_active_xid(), None);
523    }
524
525    #[test]
526    fn oldest_active_is_min_live_xid() {
527        let m = SnapshotManager::new();
528        let a = m.begin();
529        let b = m.begin();
530        assert_eq!(m.oldest_active_xid(), Some(a));
531        m.commit(a);
532        assert_eq!(m.oldest_active_xid(), Some(b));
533        m.commit(b);
534        assert_eq!(m.oldest_active_xid(), None);
535    }
536}