reddb_server/storage/transaction/snapshot.rs
1//! MVCC Snapshot Manager (Phase 2.3 PG parity)
2//!
3//! Allocates monotonic transaction IDs ("xids") and tracks the set of
4//! currently-active transactions. Powers the visibility rule used by
5//! `UnifiedEntity::is_visible`:
6//!
7//! ```text
8//! xmin == 0 || xmin <= snapshot.xid AND xmax == 0 || xmax > snapshot.xid
9//! ```
10//!
11//! For Phase 2.3 the manager is in-process only — no WAL logging of xids,
12//! no crash recovery of in-flight transactions. Committed rows become
13//! permanently visible because their `xmin` is ≤ every future snapshot.
14//! Rolled-back rows keep their `xmin` but are flagged via the
15//! `aborted_xids` set, which `is_visible` can consult. Phase 2.3.2 adds
16//! the WAL integration; Phase 4 adds full ACID recovery.
17//!
18//! # Isolation levels
19//!
20//! * `ReadCommitted` — each statement takes a fresh snapshot. Good enough
21//! for most OLTP; supports non-repeatable reads across statements.
22//! * `SnapshotIsolation` — one snapshot per transaction. No read skew
23//! within a transaction; writes conflict on first-committer-wins.
24//! * `Serializable` — stricter conflict detection (predicate locks). Not
25//! implemented in Phase 2.3; resolver accepts the mode but downgrades
26//! to SnapshotIsolation semantics with a logged warning.
27
28use std::collections::{HashMap, HashSet};
29use std::sync::atomic::{AtomicU64, Ordering};
30
31use super::coordinator::IsolationLevel;
32
33/// Default autocommit-xid pool batch size. Each refill reserves this
34/// many xids in a single `fetch_add` so back-to-back autocommit inserts
35/// share one atomic op instead of paying it per row. Sized small to
36/// keep a pristine `peek_next_xid()` close to the truth — VACUUM and
37/// diagnostics treat reserved-but-unused xids as already-committed.
38pub(crate) const AUTOCOMMIT_POOL_BATCH: u64 = 16;
39
40/// A transaction identifier. Monotonic across the lifetime of the process.
41pub type Xid = u64;
42
43/// Reserved xid meaning "not inside a transaction" — pre-MVCC rows stamp
44/// this value so they stay visible to every snapshot.
45pub const XID_NONE: Xid = 0;
46
47/// Immutable snapshot taken at transaction start or statement start.
48///
49/// Callers evaluate `UnifiedEntity::is_visible(snapshot.xid)` on every
50/// row returned from storage to filter out rows created by concurrent
51/// transactions that hadn't committed when the snapshot was taken.
52#[derive(Debug, Clone)]
53pub struct Snapshot {
54 /// The snapshot's xid — every row with `xmin <= xid` created before
55 /// the snapshot is visible (assuming `xmax` hasn't passed).
56 pub xid: Xid,
57 /// Transactions that were still active when the snapshot was taken.
58 /// Their writes must be *hidden* even when `xmin <= xid`, because
59 /// the writer hadn't committed yet from this snapshot's point of view.
60 pub in_progress: HashSet<Xid>,
61}
62
63impl Snapshot {
64 /// Is a row with this xmin/xmax visible under this snapshot?
65 ///
66 /// Delegates to [`super::visibility::is_visible`] — the deep
67 /// module that owns the full MVCC visibility predicate. The
68 /// `aborted` argument is empty here because `Snapshot` does not
69 /// carry the manager-level aborted set; callers that need the
70 /// rolled-back-writer rule should consult [`SnapshotManager`]
71 /// directly, or evolve `Snapshot` to embed an aborted view.
72 pub fn sees(&self, xmin: Xid, xmax: Xid) -> bool {
73 super::visibility::is_visible(xmin, xmax, self.xid, &self.in_progress, &HashSet::new())
74 }
75}
76
77/// Per-transaction state tracked on the runtime while BEGIN/COMMIT/ROLLBACK
78/// is active. Attached to a connection via `RuntimeInner::tx_contexts`.
79#[derive(Debug, Clone)]
80pub struct TxnContext {
81 pub xid: Xid,
82 pub isolation: IsolationLevel,
83 /// Snapshot captured at BEGIN (SnapshotIsolation / Serializable) or
84 /// refreshed per-statement (ReadCommitted).
85 pub snapshot: Snapshot,
86 /// Ordered list of `(savepoint_name, sub_xid)` entries (Phase
87 /// 2.3.2e savepoints). Each SAVEPOINT pushes a freshly-allocated
88 /// xid onto this stack; writes stamp xmin/xmax with the top entry
89 /// so ROLLBACK TO SAVEPOINT can mark only those writes as aborted.
90 /// RELEASE SAVEPOINT pops the named level plus everything above it
91 /// without aborting — the sub-xids keep their effects and commit
92 /// together with the parent. Empty stack means "writes use `xid`
93 /// directly", matching pre-savepoint behaviour.
94 pub savepoints: Vec<(String, Xid)>,
95 /// Sub-xids popped by `RELEASE SAVEPOINT` that should still commit
96 /// alongside the parent. PG semantics: released subtxns keep their
97 /// writes — they're promoted to parent-visible at COMMIT. Stored
98 /// separately from `savepoints` so their names are gone (cannot be
99 /// rolled back or released again) while their xids remain trackable.
100 pub released_sub_xids: Vec<Xid>,
101}
102
103impl TxnContext {
104 /// Xid new writes in this connection should stamp onto tuples — the
105 /// innermost open savepoint, or the parent xid when no savepoint is
106 /// active.
107 pub fn writer_xid(&self) -> Xid {
108 self.savepoints.last().map(|(_, x)| *x).unwrap_or(self.xid)
109 }
110}
111
112/// Central allocator and liveness tracker.
113///
114/// Uses an atomic counter for xid allocation and a parking_lot-guarded
115/// HashSet for in-progress/aborted bookkeeping. The sets stay small —
116/// only unfinished transactions plus a finite rollback history — so a
117/// plain HashSet outperforms more complex data structures here.
118///
119/// # Autocommit xid pool
120///
121/// Single-row autocommit writes (`MutationEngine::append_one`) need an
122/// xid that's "born committed" — they call `begin()` then `commit()`
123/// back-to-back before the row is even durable. The pre-commit pool
124/// (`autocommit_pool_*`) batches the reservation: one
125/// `next_xid.fetch_add(BATCH)` reserves a contiguous range of xids,
126/// each handed out via a single atomic without touching the
127/// `RwLock<ManagerState>`. Pool xids are never inserted into `active`
128/// or `aborted` so they look like already-committed transactions to
129/// every snapshot — identical visibility semantics to the legacy
130/// `begin()/commit()` pair (which also leaves the xid in neither set).
131pub struct SnapshotManager {
132 next_xid: AtomicU64,
133 state: parking_lot::RwLock<ManagerState>,
134 /// Reservation window for the autocommit pool. A single
135 /// `parking_lot::Mutex` protects two `u64`s — `next` (next xid to
136 /// hand out) and `end` (exclusive upper bound). When `next == end`
137 /// the next caller refills by reserving `AUTOCOMMIT_POOL_BATCH`
138 /// xids in a single `next_xid.fetch_add`, dropping the lock cost
139 /// from one acquire-per-xid (the legacy `begin()`+`commit()` pair)
140 /// to one acquire-per-`AUTOCOMMIT_POOL_BATCH` xids. A plain Mutex
141 /// is enough here — the critical section is two stores and an
142 /// atomic add, and contention is bounded by the writer count.
143 autocommit_pool: parking_lot::Mutex<AutocommitPool>,
144}
145
146#[derive(Default)]
147struct AutocommitPool {
148 next: Xid,
149 end: Xid,
150}
151
152#[derive(Default)]
153struct ManagerState {
154 /// xids that have started but not yet committed/rolled back.
155 active: HashSet<Xid>,
156 /// xids that rolled back. `is_visible` MUST treat these as invisible
157 /// (the writer never committed). The set is pruned lazily by VACUUM.
158 aborted: HashSet<Xid>,
159 /// xids that must NOT be reclaimed by VACUUM because some higher-level
160 /// object (a VCS commit, a long-lived replica snapshot) still points
161 /// at them. Reference-counted so multiple pins coexist; decrementing
162 /// to zero removes the entry. `prune_aborted` skips any xid present
163 /// here so its row versions stay readable.
164 pinned: HashMap<Xid, u32>,
165}
166
167impl SnapshotManager {
168 pub fn new() -> Self {
169 Self {
170 // Start at 1 so xid=0 keeps its pre-MVCC "everyone sees it" meaning.
171 next_xid: AtomicU64::new(1),
172 state: parking_lot::RwLock::new(ManagerState::default()),
173 // Pool starts empty — first caller triggers a refill.
174 autocommit_pool: parking_lot::Mutex::new(AutocommitPool::default()),
175 }
176 }
177
178 /// Allocate a new xid and mark it active. Returns the xid for
179 /// stamping onto `UnifiedEntity::xmin/xmax`.
180 pub fn begin(&self) -> Xid {
181 let xid = self.next_xid.fetch_add(1, Ordering::Relaxed);
182 self.state.write().active.insert(xid);
183 xid
184 }
185
186 /// Capture a point-in-time snapshot. Must be called after `begin()`
187 /// when using SnapshotIsolation/Serializable. ReadCommitted refreshes
188 /// this per statement via the same call.
189 pub fn snapshot(&self, xid: Xid) -> Snapshot {
190 let state = self.state.read();
191 // Active xids other than our own appear as "in-progress" to us.
192 let in_progress: HashSet<Xid> =
193 state.active.iter().copied().filter(|&x| x != xid).collect();
194 Snapshot { xid, in_progress }
195 }
196
197 /// Mark a transaction as committed. Its writes become visible to
198 /// future snapshots; earlier snapshots keep their own view.
199 pub fn commit(&self, xid: Xid) {
200 let mut state = self.state.write();
201 state.active.remove(&xid);
202 // Also clear from aborted set in case of prior rollback_to call
203 // that touched this xid (defensive; normally a no-op).
204 state.aborted.remove(&xid);
205 }
206
207 /// Allocate an xid that is *born committed* — for autocommit
208 /// callers (`MutationEngine::append_one`) that previously paid two
209 /// `state.write()` lock acquisitions per row to insert-then-remove
210 /// from the active set.
211 ///
212 /// The returned xid is never inserted into `active` and never into
213 /// `aborted`, which matches the steady state of the legacy
214 /// `begin()/commit()` pair when called back-to-back: the xid leaves
215 /// the manager's tracking sets unobservably. Concurrent readers
216 /// therefore see it as an already-committed transaction once
217 /// `xmin <= snapshot.xid`, which is exactly the semantics the
218 /// autocommit path needs.
219 ///
220 /// Implementation: a small reservation pool (`AUTOCOMMIT_POOL_BATCH`
221 /// xids) is reserved with one `fetch_add`. Each caller hands itself
222 /// the next xid via a single atomic. When the pool drains, the
223 /// next caller serialises briefly through `autocommit_pool_refill`
224 /// to bump the window, then falls back into the lock-free hot path.
225 ///
226 /// Durability note: this method does NOT make the row durable —
227 /// it only allocates the identifier. The caller must complete the
228 /// usual WAL-append + fsync cycle before acknowledging the write.
229 /// Pre-allocating the xid is safe because the xid carries no
230 /// promise that any row exists; it's just a number for `xmin`.
231 pub fn allocate_committed_xid(&self) -> Xid {
232 let mut pool = self.autocommit_pool.lock();
233 if pool.next >= pool.end {
234 // Reserve the next contiguous range. A single
235 // `fetch_add(BATCH)` on the global counter — equivalent to
236 // BATCH back-to-back `begin()` calls in terms of xid
237 // numbering, but with zero `state.write()` traffic.
238 let start = self
239 .next_xid
240 .fetch_add(AUTOCOMMIT_POOL_BATCH, Ordering::Relaxed);
241 pool.next = start;
242 pool.end = start + AUTOCOMMIT_POOL_BATCH;
243 }
244 let xid = pool.next;
245 pool.next += 1;
246 xid
247 }
248
249 /// Mark a transaction as rolled back. Its writes MUST stay hidden
250 /// from every future read — `is_visible` consults the aborted set
251 /// before honouring a row's `xmin`.
252 pub fn rollback(&self, xid: Xid) {
253 let mut state = self.state.write();
254 state.active.remove(&xid);
255 state.aborted.insert(xid);
256 }
257
258 /// Is this xid known to have rolled back? Called by the read path to
259 /// skip tuples whose creator never committed.
260 pub fn is_aborted(&self, xid: Xid) -> bool {
261 self.state.read().aborted.contains(&xid)
262 }
263
264 /// Is this xid still active?
265 pub fn is_active(&self, xid: Xid) -> bool {
266 self.state.read().active.contains(&xid)
267 }
268
269 /// Snapshot of every still-active xid (for VACUUM oldest-active-xid
270 /// calculation — any row with `xmax < min(active)` is reclaimable).
271 pub fn oldest_active_xid(&self) -> Option<Xid> {
272 self.state.read().active.iter().copied().min()
273 }
274
275 /// Oldest externally pinned xid. Pinned snapshots behave like active
276 /// snapshots for VACUUM: any tuple visible to that xid must survive even
277 /// when no SQL transaction is currently active.
278 pub fn oldest_pinned_xid(&self) -> Option<Xid> {
279 self.state.read().pinned.keys().copied().min()
280 }
281
282 /// Return the next xid that would be allocated. Useful for diagnostics
283 /// and for VACUUM to know the upper bound of aborted-xid retention.
284 pub fn peek_next_xid(&self) -> Xid {
285 self.next_xid.load(Ordering::Relaxed)
286 }
287
288 /// Advance the allocator so future snapshots consider an xid
289 /// recovered from storage/WAL to be in the committed past.
290 pub fn observe_committed_xid(&self, xid: Xid) {
291 if xid == XID_NONE {
292 return;
293 }
294 let target = xid.saturating_add(1);
295 let mut current = self.next_xid.load(Ordering::Relaxed);
296 while current < target {
297 match self.next_xid.compare_exchange(
298 current,
299 target,
300 Ordering::Relaxed,
301 Ordering::Relaxed,
302 ) {
303 Ok(_) => return,
304 Err(actual) => current = actual,
305 }
306 }
307 }
308
309 /// Prune the aborted-xid set. Safe to call once every aborted xid is
310 /// below `oldest_active`, which guarantees no live snapshot depends
311 /// on the distinction between "aborted" and "never existed". Pinned
312 /// xids are always retained so higher-level references (VCS commits,
313 /// replica snapshots) stay readable.
314 pub fn prune_aborted(&self, below: Xid) {
315 let mut state = self.state.write();
316 let ManagerState {
317 aborted, pinned, ..
318 } = &mut *state;
319 aborted.retain(|&x| x >= below || pinned.contains_key(&x));
320 }
321
322 /// Pin an xid so its row versions stay reclaim-safe across VACUUM.
323 /// Reference-counted — call `unpin` once per `pin` to release.
324 pub fn pin(&self, xid: Xid) {
325 if xid == XID_NONE {
326 return;
327 }
328 let mut state = self.state.write();
329 *state.pinned.entry(xid).or_insert(0) += 1;
330 }
331
332 /// Decrement an xid's pin count. At zero it is removed and becomes
333 /// VACUUM-eligible again. No-op if the xid was never pinned.
334 pub fn unpin(&self, xid: Xid) {
335 if xid == XID_NONE {
336 return;
337 }
338 let mut state = self.state.write();
339 if let Some(count) = state.pinned.get_mut(&xid) {
340 if *count <= 1 {
341 state.pinned.remove(&xid);
342 } else {
343 *count -= 1;
344 }
345 }
346 }
347
348 /// Is this xid currently pinned?
349 pub fn is_pinned(&self, xid: Xid) -> bool {
350 self.state.read().pinned.contains_key(&xid)
351 }
352
353 /// Current pin count for an xid (0 if not pinned). Diagnostic only.
354 pub fn pin_count(&self, xid: Xid) -> u32 {
355 self.state.read().pinned.get(&xid).copied().unwrap_or(0)
356 }
357}
358
359impl Default for SnapshotManager {
360 fn default() -> Self {
361 Self::new()
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368
369 #[test]
370 fn xids_are_monotonic() {
371 let m = SnapshotManager::new();
372 let a = m.begin();
373 let b = m.begin();
374 let c = m.begin();
375 assert!(a < b && b < c);
376 }
377
378 #[test]
379 fn snapshot_excludes_concurrent_writers() {
380 let m = SnapshotManager::new();
381 let writer = m.begin();
382 let reader = m.begin();
383 let snap = m.snapshot(reader);
384 // Writer is active from reader's perspective → in_progress set.
385 assert!(snap.in_progress.contains(&writer));
386 // A row written by `writer` with xmin=writer must be invisible.
387 assert!(!snap.sees(writer, XID_NONE));
388 }
389
390 #[test]
391 fn committed_rows_become_visible() {
392 let m = SnapshotManager::new();
393 let writer = m.begin();
394 m.commit(writer);
395 let reader = m.begin();
396 let snap = m.snapshot(reader);
397 // Row stamped with writer's xid is now visible (writer < reader & committed).
398 assert!(snap.sees(writer, XID_NONE));
399 }
400
401 #[test]
402 fn rolled_back_writers_stay_hidden() {
403 let m = SnapshotManager::new();
404 let writer = m.begin();
405 m.rollback(writer);
406 assert!(m.is_aborted(writer));
407 // Future callers skip tuples with xmin == writer by also consulting is_aborted.
408 }
409
410 #[test]
411 fn pre_mvcc_rows_always_visible() {
412 let m = SnapshotManager::new();
413 let reader = m.begin();
414 let snap = m.snapshot(reader);
415 assert!(snap.sees(XID_NONE, XID_NONE));
416 }
417
418 #[test]
419 fn deletion_xmax_respected() {
420 let m = SnapshotManager::new();
421 let creator = m.begin();
422 m.commit(creator);
423 let deleter = m.begin();
424 m.commit(deleter);
425 let reader = m.begin();
426 let snap = m.snapshot(reader);
427 // Reader opens *after* delete → row must be hidden.
428 assert!(!snap.sees(creator, deleter));
429 }
430
431 #[test]
432 fn pin_blocks_prune_of_aborted_xid() {
433 let m = SnapshotManager::new();
434 let writer = m.begin();
435 m.rollback(writer);
436 assert!(m.is_aborted(writer));
437 m.pin(writer);
438 // Even with a high watermark, pinned xid survives prune.
439 m.prune_aborted(writer + 1);
440 assert!(m.is_aborted(writer));
441 m.unpin(writer);
442 m.prune_aborted(writer + 1);
443 assert!(!m.is_aborted(writer));
444 }
445
446 #[test]
447 fn pin_is_reference_counted() {
448 let m = SnapshotManager::new();
449 let x = m.begin();
450 m.pin(x);
451 m.pin(x);
452 assert_eq!(m.pin_count(x), 2);
453 m.unpin(x);
454 assert_eq!(m.pin_count(x), 1);
455 assert!(m.is_pinned(x));
456 m.unpin(x);
457 assert_eq!(m.pin_count(x), 0);
458 assert!(!m.is_pinned(x));
459 // Extra unpin is a no-op.
460 m.unpin(x);
461 assert_eq!(m.pin_count(x), 0);
462 }
463
464 #[test]
465 fn pin_xid_none_is_noop() {
466 let m = SnapshotManager::new();
467 m.pin(XID_NONE);
468 assert!(!m.is_pinned(XID_NONE));
469 assert_eq!(m.pin_count(XID_NONE), 0);
470 }
471
472 #[test]
473 fn allocate_committed_xid_is_monotonic_and_unique() {
474 let m = SnapshotManager::new();
475 let mut seen = HashSet::new();
476 let mut last = 0u64;
477 // Drive at least three pool refills (BATCH=16 → 50 covers it).
478 for _ in 0..50 {
479 let x = m.allocate_committed_xid();
480 assert!(x > last, "xids must be strictly increasing: {x} > {last}");
481 assert!(seen.insert(x), "duplicate xid handed out: {x}");
482 last = x;
483 }
484 }
485
486 #[test]
487 fn allocate_committed_xid_skips_active_set() {
488 let m = SnapshotManager::new();
489 let _x = m.allocate_committed_xid();
490 // Pool xids must never appear in the active set — they are
491 // born committed. `oldest_active_xid` reflects only `begin()`
492 // callers (real BEGIN-wrapped transactions).
493 assert_eq!(m.oldest_active_xid(), None);
494 }
495
496 #[test]
497 fn allocate_committed_xid_visible_to_subsequent_snapshots() {
498 let m = SnapshotManager::new();
499 let writer = m.allocate_committed_xid();
500 let reader = m.begin();
501 let snap = m.snapshot(reader);
502 // Pool xid must be invisible to in_progress/aborted (it's in
503 // neither) and visible because writer < reader. This matches
504 // the legacy begin()+commit() pair's visibility exactly.
505 assert!(!snap.in_progress.contains(&writer));
506 assert!(!m.is_aborted(writer));
507 assert!(snap.sees(writer, XID_NONE));
508 }
509
510 #[test]
511 fn allocate_committed_xid_does_not_block_concurrent_begin() {
512 // Smoke test: an open BEGIN-wrapped tx coexists with pool
513 // allocation; pool xids end up between the begin and commit
514 // without being added to `active`.
515 let m = SnapshotManager::new();
516 let tx = m.begin();
517 let auto1 = m.allocate_committed_xid();
518 let auto2 = m.allocate_committed_xid();
519 m.commit(tx);
520 assert!(tx < auto1 && auto1 < auto2);
521 // `active` should be empty after commit.
522 assert_eq!(m.oldest_active_xid(), None);
523 }
524
525 #[test]
526 fn oldest_active_is_min_live_xid() {
527 let m = SnapshotManager::new();
528 let a = m.begin();
529 let b = m.begin();
530 assert_eq!(m.oldest_active_xid(), Some(a));
531 m.commit(a);
532 assert_eq!(m.oldest_active_xid(), Some(b));
533 m.commit(b);
534 assert_eq!(m.oldest_active_xid(), None);
535 }
536}