sqlrite/mvcc/store.rs
1//! [`MvStore`] — the in-memory version index sitting in front of
2//! the pager (Phase 11.3 skeleton).
3//!
4//! Per [`docs/concurrent-writes-plan.md`](../../../docs/concurrent-writes-plan.md):
5//!
6//! > The MVCC store keeps an in-memory map keyed by `RowID
7//! > { table_id, row_key }` whose value is a chain of `RowVersion`
8//! > records. Each version carries `begin`/`end` timestamps and the
9//! > row payload itself. Visibility for a reader transaction with
10//! > begin-timestamp `T` is the textbook snapshot-isolation rule:
11//! > pick the version whose `begin <= T < end`.
12//!
13//! Phase 11.3 lands the standalone data structures + visibility
14//! logic so 11.4 can plug them into:
15//!
16//! - the **executor's read path** when the connection is in MVCC
17//! journal mode (the [`super::JournalMode`] enum);
18//! - the **commit path**, which mirrors successful writes from the
19//! legacy `Database::tables` map into the MvStore at the assigned
20//! `commit_ts` and ends the previous latest version at the same
21//! timestamp.
22//!
23//! Today nothing in the executor calls into this module. The
24//! `PRAGMA journal_mode = mvcc` switch parses but doesn't change
25//! query behaviour. That's intentional — committing to a half-wired
26//! read path before the write side exists would force 11.4's
27//! commit-validation work into this PR. The two are coupled and
28//! ship together.
29//!
30//! ## Why one big mutex per chain rather than a per-row lock
31//!
32//! v0 stores each row's version chain inside an
33//! `Arc<RwLock<Vec<RowVersion>>>`. The outer map is a
34//! `Mutex<HashMap<RowID, _>>`. Two reasons not to over-engineer:
35//!
36//! 1. The plan-doc explicitly calls this out:
37//! > One chain per row, behind `RwLock` (or `parking_lot::RwLock`).
38//! > The wait-free chain is a known follow-up; it's not on the v0
39//! > critical path.
40//! 2. The hot path is `MvStore::read`, which takes the outer lock to
41//! fetch the `Arc<RwLock<…>>`, drops it, then takes the chain's
42//! `RwLock` in read mode for the visibility scan. The outer lock
43//! is held only long enough to clone an `Arc`.
44//!
45//! When the commit path lands (11.4) and we observe contention, a
46//! sharded outer map (e.g. `dashmap`) becomes the obvious upgrade —
47//! same `RowID → chain` shape, just multiple shards. None of
48//! `MvStore`'s public surface assumes the inner storage shape, so
49//! the swap is local.
50
51use std::collections::HashMap;
52use std::sync::{Arc, Mutex, RwLock};
53
54use crate::sql::db::table::Value;
55
56use super::clock::MvccClock;
57use super::registry::{ActiveTxRegistry, TxTimestampOrId};
58
59/// Identifies a row across the MvStore. v0 keys by table name +
60/// rowid because the engine doesn't yet have a stable numeric
61/// `table_id` (the schema catalog is keyed by name). When 11.5
62/// lands a numeric table id (likely as part of the checkpoint
63/// integration so the index doesn't carry a `String` per row),
64/// flip this to `(u32, i64)` — every consumer of `RowID` only
65/// uses it for hashing / equality, so the rename is local.
66#[derive(Debug, Clone, PartialEq, Eq, Hash)]
67pub struct RowID {
68 pub table: String,
69 pub rowid: i64,
70}
71
72impl RowID {
73 pub fn new(table: impl Into<String>, rowid: i64) -> Self {
74 Self {
75 table: table.into(),
76 rowid,
77 }
78 }
79}
80
81/// What a [`RowVersion`] records. `Present` carries the row's
82/// column values at the moment of commit; `Tombstone` records that
83/// the row was deleted at this version's `begin` timestamp.
84///
85/// Storing column-value pairs as a `Vec<(String, Value)>` rather
86/// than `BTreeMap<String, Value>` because:
87/// - The vector preserves declaration order (stable for tests +
88/// diagnostics).
89/// - Lookups by column are rare on this path — the executor walks
90/// the row by projection order.
91#[derive(Debug, Clone, PartialEq)]
92pub enum VersionPayload {
93 /// Row exists with the given column-value pairs.
94 Present(Vec<(String, Value)>),
95 /// Row was deleted at this version's `begin` timestamp. Visible
96 /// readers see "no such row"; readers older than `begin` still
97 /// see whatever the previous version held.
98 Tombstone,
99}
100
101/// One link in a row's version chain.
102///
103/// Visibility under snapshot isolation is the textbook rule the
104/// Hekaton paper formalises and Turso's MVCC implements:
105///
106/// - `begin <= T`: the version was committed at or before the
107/// reader's begin-timestamp. (For an in-flight version
108/// `begin = Id(tx)`, only the producing transaction can see it.)
109/// - `end > T` or `end is None`: the version hasn't been superseded
110/// yet from the reader's point of view.
111///
112/// Both conditions must hold. See [`MvStore::visible_at`].
113#[derive(Debug, Clone)]
114pub struct RowVersion {
115 pub begin: TxTimestampOrId,
116 pub end: Option<TxTimestampOrId>,
117 pub payload: VersionPayload,
118}
119
120impl RowVersion {
121 /// Builds a freshly-committed version at `commit_ts` with no
122 /// `end` (i.e. currently latest). This is the shape the legacy
123 /// commit path produces in 11.4 when it mirrors a row write.
124 pub fn committed(commit_ts: u64, payload: VersionPayload) -> Self {
125 Self {
126 begin: TxTimestampOrId::Timestamp(commit_ts),
127 end: None,
128 payload,
129 }
130 }
131
132 /// Builds an in-flight version owned by `tx_id`. v0 tests use
133 /// this to construct chains by hand; the production write path
134 /// (11.4) will own it.
135 pub fn in_flight(tx_id: super::TxId, payload: VersionPayload) -> Self {
136 Self {
137 begin: TxTimestampOrId::Id(tx_id),
138 end: None,
139 payload,
140 }
141 }
142}
143
144/// A row's version chain. Newest version at the back — easy
145/// `push_version` semantics; reads scan from the back since that's
146/// where most queries' `begin_ts` lands.
147pub type RowVersionChain = Vec<RowVersion>;
148
149/// In-memory MVCC version index. Cheap to clone — the heavy state
150/// is behind `Arc`s.
151#[derive(Clone, Debug)]
152pub struct MvStore {
153 inner: Arc<MvStoreInner>,
154}
155
156#[derive(Debug)]
157struct MvStoreInner {
158 /// `RowID → version chain`. Outer `Mutex` guards the map's
159 /// shape (insert / lookup); the per-chain `RwLock` guards the
160 /// `Vec` (so two readers walking different chains don't fight,
161 /// and the writer that ends the latest version doesn't block
162 /// readers on other chains).
163 versions: Mutex<HashMap<RowID, Arc<RwLock<RowVersionChain>>>>,
164 clock: Arc<MvccClock>,
165 active: ActiveTxRegistry,
166}
167
168impl MvStore {
169 /// Builds an empty store wired to a shared clock + registry.
170 /// Phase 11.3 wires this into `Database` so every connection
171 /// observes the same version index; 11.2's `Wal::clock_high_water`
172 /// seeds the clock at open time.
173 pub fn new(clock: Arc<MvccClock>) -> Self {
174 Self {
175 inner: Arc::new(MvStoreInner {
176 versions: Mutex::new(HashMap::new()),
177 clock,
178 active: ActiveTxRegistry::new(),
179 }),
180 }
181 }
182
183 /// Convenience for tests + standalone callers — builds a store
184 /// over a freshly-allocated clock seeded at 0. The clock is
185 /// returned so the caller can `tick()` it to allocate
186 /// timestamps for hand-built versions.
187 pub fn fresh() -> (Self, Arc<MvccClock>) {
188 let clock = Arc::new(MvccClock::new(0));
189 let store = Self::new(Arc::clone(&clock));
190 (store, clock)
191 }
192
193 /// Returns the shared clock. The same `Arc` every consumer
194 /// (commit path, read path, GC) holds.
195 pub fn clock(&self) -> &Arc<MvccClock> {
196 &self.inner.clock
197 }
198
199 /// Returns the active-transaction registry. Phase 11.4 will
200 /// register `BEGIN CONCURRENT` transactions here; Phase 11.6
201 /// reads `min_active_begin_ts()` to set the GC watermark.
202 pub fn active_registry(&self) -> &ActiveTxRegistry {
203 &self.inner.active
204 }
205
206 /// Number of rows the store holds at least one version for.
207 /// Cheap diagnostic — locks only the outer map briefly.
208 pub fn tracked_rows(&self) -> usize {
209 self.lock_map().len()
210 }
211
212 /// Total versions across every chain. Linear in row count;
213 /// intended for tests + assertions, not the hot path.
214 pub fn total_versions(&self) -> usize {
215 let map = self.lock_map();
216 map.values()
217 .map(|chain| chain.read().expect("chain RwLock poisoned").len())
218 .sum()
219 }
220
221 /// Returns the version of `row_id` that's visible to a reader
222 /// transaction whose begin-timestamp is `begin_ts`, or `None`
223 /// if no version satisfies the snapshot-isolation rule.
224 ///
225 /// Snapshot-isolation visibility:
226 /// - the version's `begin` is a committed timestamp `<= begin_ts`,
227 /// and
228 /// - the version's `end` is `None` (still latest) or a committed
229 /// timestamp `> begin_ts`.
230 ///
231 /// In-flight versions (`begin = Id(_)`) are never visible to
232 /// other readers — they're a placeholder until the producing
233 /// transaction either commits (the version's `begin` is rewritten
234 /// to a `Timestamp`) or aborts (the version is dropped). The
235 /// producing transaction itself reads its own writes through a
236 /// separate path (Phase 11.4); it doesn't go through this
237 /// function.
238 ///
239 /// The chain is scanned **front to back**: in v0 we don't trust
240 /// any insertion order, so the loop must not exit early. When
241 /// the chain becomes ordered-by-`begin` (a natural property of
242 /// the commit path's append-only writes in 11.4), this can
243 /// short-circuit on the first visible version.
244 pub fn read(&self, row_id: &RowID, begin_ts: u64) -> Option<VersionPayload> {
245 let chain = {
246 let map = self.lock_map();
247 Arc::clone(map.get(row_id)?)
248 };
249 let chain = chain.read().expect("chain RwLock poisoned");
250 for v in chain.iter() {
251 if Self::visible_at(v, begin_ts) {
252 return Some(v.payload.clone());
253 }
254 }
255 None
256 }
257
258 /// Returns true if `version` is visible to a reader whose
259 /// begin-timestamp is `begin_ts`. Pure function — exposed for
260 /// tests + future GC code.
261 pub fn visible_at(version: &RowVersion, begin_ts: u64) -> bool {
262 // begin must be a committed timestamp <= begin_ts.
263 let begin_ok = match version.begin {
264 TxTimestampOrId::Timestamp(t) => t <= begin_ts,
265 TxTimestampOrId::Id(_) => false,
266 };
267 if !begin_ok {
268 return false;
269 }
270 // end must be None (still latest) OR a committed timestamp
271 // strictly > begin_ts. An in-flight `Id(_)` cap means some
272 // other transaction is in the process of superseding this
273 // version but hasn't committed yet — from the reader's
274 // perspective the version is still latest.
275 match version.end {
276 None => true,
277 Some(TxTimestampOrId::Timestamp(t)) => t > begin_ts,
278 Some(TxTimestampOrId::Id(_)) => true,
279 }
280 }
281
282 /// Returns the begin-timestamp of the latest committed version
283 /// in `row_id`'s chain, or `None` if the row has no committed
284 /// versions (the chain is empty or only carries in-flight
285 /// placeholders).
286 ///
287 /// Phase 11.4 — the commit-validation pass calls this for
288 /// every row in its write-set. If the latest committed begin
289 /// is greater than the validating transaction's `begin_ts`,
290 /// some other transaction superseded the row after our
291 /// snapshot — abort with [`crate::error::SQLRiteError::Busy`].
292 pub fn latest_committed_begin(&self, row_id: &RowID) -> Option<u64> {
293 let chain = {
294 let map = self.lock_map();
295 Arc::clone(map.get(row_id)?)
296 };
297 let chain = chain.read().expect("chain RwLock poisoned");
298 // Walk back-to-front — the latest committed version is
299 // typically the rightmost element. Skip in-flight versions
300 // (`begin = Id(_)`) — they aren't published yet.
301 chain.iter().rev().find_map(|v| match v.begin {
302 TxTimestampOrId::Timestamp(t) => Some(t),
303 TxTimestampOrId::Id(_) => None,
304 })
305 }
306
307 /// Pushes a new version onto the chain for `row_id`. Caps the
308 /// chain's previous latest version (if any) at `version.begin`
309 /// — the canonical write-side bookkeeping the commit path will
310 /// use in 11.4.
311 ///
312 /// `version.begin` must be a `Timestamp` (committed) — pushing
313 /// an in-flight version through this entry point would break
314 /// the cap rule. Use [`MvStore::push_in_flight`] for in-flight
315 /// versions; commit will rewrite their `begin` later.
316 ///
317 /// Errors if the new `begin` is `<= the previous latest's
318 /// begin` (violates monotonicity — the commit path must always
319 /// hand out increasing timestamps via the `MvccClock`).
320 pub fn push_committed(&self, row_id: RowID, version: RowVersion) -> Result<(), MvStoreError> {
321 let begin_ts = match version.begin {
322 TxTimestampOrId::Timestamp(t) => t,
323 TxTimestampOrId::Id(_) => return Err(MvStoreError::NotCommitted),
324 };
325 let chain_arc = self.get_or_create_chain(row_id);
326 let mut chain = chain_arc.write().expect("chain RwLock poisoned");
327 if let Some(prev) = chain.last() {
328 // Validate before mutating — a failed validation must
329 // not leave the chain in a half-capped state. (Earlier
330 // drafts mutated `prev.end` first, then ran these
331 // checks; equal-begin retries then surfaced as
332 // `PreviousAlreadyCapped` instead of the
333 // `NonMonotonicBegin` callers expect.)
334 let prev_begin = match prev.begin {
335 TxTimestampOrId::Timestamp(t) => t,
336 TxTimestampOrId::Id(_) => 0,
337 };
338 if begin_ts <= prev_begin {
339 return Err(MvStoreError::NonMonotonicBegin {
340 prev: prev_begin,
341 new: begin_ts,
342 });
343 }
344 match prev.end {
345 None => {}
346 Some(TxTimestampOrId::Timestamp(existing)) if existing == begin_ts => {
347 // Idempotent replay — already capped at exactly
348 // this timestamp (recovery path will hit this).
349 }
350 Some(TxTimestampOrId::Timestamp(existing)) => {
351 return Err(MvStoreError::PreviousAlreadyCapped { existing });
352 }
353 Some(TxTimestampOrId::Id(_)) => {
354 // An in-flight cap means another transaction
355 // owns the supersession; the commit path
356 // shouldn't hit this in 11.4 (validation runs
357 // first). v0 returns a typed error rather than
358 // silently overwriting.
359 return Err(MvStoreError::PreviousCappedByInFlight);
360 }
361 }
362 }
363 // Validation passed — apply the cap (if any) and push.
364 if let Some(prev) = chain.last_mut() {
365 if prev.end.is_none() {
366 prev.end = Some(TxTimestampOrId::Timestamp(begin_ts));
367 }
368 }
369 chain.push(version);
370 Ok(())
371 }
372
373 /// Pushes an in-flight version onto the chain. Used by the
374 /// 11.4 write path while a `BEGIN CONCURRENT` transaction is
375 /// open; the version's `begin` is rewritten from `Id(tx)` to
376 /// `Timestamp(commit_ts)` on commit, and the previous latest
377 /// gets capped at the same timestamp (via [`Self::push_committed`]
378 /// at commit time, after the in-flight version is removed).
379 ///
380 /// 11.3 ships this as standalone API for tests; 11.4 wires it
381 /// into the executor.
382 pub fn push_in_flight(&self, row_id: RowID, version: RowVersion) {
383 let chain_arc = self.get_or_create_chain(row_id);
384 let mut chain = chain_arc.write().expect("chain RwLock poisoned");
385 chain.push(version);
386 }
387
388 // -----------------------------------------------------------------
389 // Phase 11.6 — garbage collection
390 // -----------------------------------------------------------------
391
392 /// Returns the GC watermark — the timestamp below which any
393 /// committed-and-superseded version is reclaimable.
394 ///
395 /// - If there's at least one in-flight transaction, the
396 /// watermark is its `begin_ts` (the smallest one across the
397 /// active set). Versions whose `end` timestamp is `> watermark`
398 /// may still be visible to that reader and must be kept.
399 /// - With no in-flight transactions the watermark is `u64::MAX`,
400 /// meaning every superseded version can go (the latest version
401 /// per row stays — its `end` is `None`).
402 ///
403 /// The `+1` shift versus the strict snapshot-isolation
404 /// reclamation rule keeps the math simple: `gc_chain` retains
405 /// versions whose end-timestamp is strictly greater than the
406 /// watermark, so `watermark = u64::MAX` reclaims every version
407 /// with `end = Some(_)` cleanly.
408 pub fn active_watermark(&self) -> u64 {
409 self.inner.active.min_active_begin_ts().unwrap_or(u64::MAX)
410 }
411
412 /// Garbage-collects `row_id`'s version chain against
413 /// `watermark`. A committed version is reclaimable when its
414 /// `end` timestamp is `<= watermark` — at that point no
415 /// reader's `begin_ts` falls in the half-open `[begin, end)`
416 /// interval that the snapshot-isolation rule requires for
417 /// visibility. In-flight versions and the latest committed
418 /// version (`end == None`) are always kept.
419 ///
420 /// Returns the number of versions reclaimed. Drops the chain
421 /// from the outer map entirely if it ends up empty (no
422 /// versions left after the sweep), so the per-row entry
423 /// doesn't leak memory.
424 pub fn gc_chain(&self, row_id: &RowID, watermark: u64) -> usize {
425 let chain_arc = match self.lock_map().get(row_id) {
426 Some(arc) => Arc::clone(arc),
427 None => return 0,
428 };
429 let reclaimed = {
430 let mut chain = chain_arc.write().expect("chain RwLock poisoned");
431 let before = chain.len();
432 chain.retain(|v| match v.end {
433 // Committed-end timestamp at or below the watermark
434 // is reclaimable; anything strictly above it is
435 // still possibly visible to a reader.
436 Some(TxTimestampOrId::Timestamp(t)) => t > watermark,
437 // None (latest version) and in-flight `Id(_)` caps
438 // are always kept.
439 _ => true,
440 });
441 before - chain.len()
442 };
443 // Drop the row's outer-map entry if its chain is now
444 // empty. Cheap and avoids accumulating empty rows over
445 // long-running sessions. Re-checks `is_empty` under both
446 // locks so we don't race with a `push_committed` that's
447 // about to add a new version.
448 if reclaimed > 0 {
449 let chain_locked = chain_arc.read().expect("chain RwLock poisoned");
450 if chain_locked.is_empty() {
451 drop(chain_locked);
452 self.lock_map().remove(row_id);
453 }
454 }
455 reclaimed
456 }
457
458 /// Sweeps every row in the store against `watermark`. Returns
459 /// the total number of versions reclaimed. Used by
460 /// [`crate::Connection::vacuum_mvcc`] for an explicit full
461 /// drain; per-commit callers should prefer
462 /// [`MvStore::gc_chain`] over the rows they actually touched.
463 pub fn gc_all(&self, watermark: u64) -> usize {
464 // Snapshot the row keys upfront so we don't hold the outer
465 // map lock across the per-chain locks (avoids a long
466 // critical section that would block concurrent
467 // `push_committed` / `read` calls).
468 let row_ids: Vec<RowID> = self.lock_map().keys().cloned().collect();
469 row_ids
470 .iter()
471 .map(|rid| self.gc_chain(rid, watermark))
472 .sum()
473 }
474
475 fn get_or_create_chain(&self, row_id: RowID) -> Arc<RwLock<RowVersionChain>> {
476 let mut map = self.lock_map();
477 Arc::clone(
478 map.entry(row_id)
479 .or_insert_with(|| Arc::new(RwLock::new(Vec::new()))),
480 )
481 }
482
483 fn lock_map(&self) -> std::sync::MutexGuard<'_, HashMap<RowID, Arc<RwLock<RowVersionChain>>>> {
484 self.inner
485 .versions
486 .lock()
487 .unwrap_or_else(|e| panic!("sqlrite: MvStore versions mutex poisoned: {e}"))
488 }
489}
490
491/// Errors returned by mutating MvStore operations. Read-side calls
492/// (`read`, `visible_at`) don't error.
493#[derive(Debug, thiserror::Error, PartialEq)]
494pub enum MvStoreError {
495 /// `push_committed` got a version whose `begin` is an in-flight
496 /// `TxId` rather than a committed `Timestamp`.
497 #[error("push_committed expects a committed Timestamp, not an in-flight TxId")]
498 NotCommitted,
499
500 /// The previous latest version is already capped at a different
501 /// timestamp. Either the caller is double-committing, or the
502 /// commit path is racing with itself (which 11.4's commit-validation
503 /// loop is supposed to prevent).
504 #[error("previous latest version already capped at end_ts={existing}")]
505 PreviousAlreadyCapped { existing: u64 },
506
507 /// The previous latest's `end` is set to an in-flight cap. v0
508 /// rejects rather than silently overwriting; 11.4's commit
509 /// validation runs first so this shouldn't fire in production.
510 #[error("previous latest version is being capped by an in-flight transaction")]
511 PreviousCappedByInFlight,
512
513 /// New version's `begin` is not strictly greater than the
514 /// previous latest's `begin`. The clock should always hand out
515 /// monotonically increasing timestamps; this is a corruption /
516 /// bug indicator.
517 #[error("non-monotonic begin: previous={prev}, new={new}")]
518 NonMonotonicBegin { prev: u64, new: u64 },
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524
525 fn payload(value: i64) -> VersionPayload {
526 VersionPayload::Present(vec![("v".to_string(), Value::Integer(value))])
527 }
528
529 #[test]
530 fn empty_store_returns_none() {
531 let (store, _clock) = MvStore::fresh();
532 assert!(store.read(&RowID::new("t", 1), 100).is_none());
533 assert_eq!(store.tracked_rows(), 0);
534 assert_eq!(store.total_versions(), 0);
535 }
536
537 /// Snapshot isolation visibility — the headline rule. One row
538 /// gets two committed versions at different timestamps; readers
539 /// at varying `begin_ts` see exactly the version that satisfies
540 /// `begin <= T < end`.
541 #[test]
542 fn visibility_picks_the_right_version_for_each_begin_ts() {
543 let (store, clock) = MvStore::fresh();
544 let row = RowID::new("accounts", 1);
545
546 // V1 committed at ts=5, V2 committed at ts=10.
547 clock.observe(5);
548 store
549 .push_committed(row.clone(), RowVersion::committed(5, payload(100)))
550 .unwrap();
551 clock.observe(10);
552 store
553 .push_committed(row.clone(), RowVersion::committed(10, payload(200)))
554 .unwrap();
555
556 // Reader before V1 — nothing visible.
557 assert_eq!(store.read(&row, 4), None);
558
559 // Reader at exactly V1's begin — sees V1.
560 assert_eq!(store.read(&row, 5), Some(payload(100)));
561
562 // Reader between V1 and V2 — still sees V1 (V2's begin > T).
563 assert_eq!(store.read(&row, 9), Some(payload(100)));
564
565 // Reader at exactly V2's begin — sees V2.
566 assert_eq!(store.read(&row, 10), Some(payload(200)));
567
568 // Reader past V2 — sees V2.
569 assert_eq!(store.read(&row, 1_000), Some(payload(200)));
570 }
571
572 /// `push_committed` caps the previous latest version's `end` at
573 /// the new version's `begin`. Without this, every version's
574 /// `end` would stay None and the visibility rule would return
575 /// the oldest committed version for every reader.
576 #[test]
577 fn push_committed_caps_previous_latest() {
578 let (store, _clock) = MvStore::fresh();
579 let row = RowID::new("t", 7);
580 store
581 .push_committed(row.clone(), RowVersion::committed(2, payload(1)))
582 .unwrap();
583 store
584 .push_committed(row.clone(), RowVersion::committed(5, payload(2)))
585 .unwrap();
586 // Inspect the chain through the public API. A reader at
587 // exactly ts=4 should see V1 — that's only correct if V1's
588 // end was set to Some(Timestamp(5)).
589 assert_eq!(store.read(&row, 4), Some(payload(1)));
590 }
591
592 /// The visibility helper is pure; test it independently of
593 /// the chain to lock down the rule.
594 #[test]
595 fn visible_at_handles_each_combination() {
596 // Committed begin, no end — visible iff T >= begin.
597 let v = RowVersion {
598 begin: TxTimestampOrId::Timestamp(10),
599 end: None,
600 payload: payload(0),
601 };
602 assert!(!MvStore::visible_at(&v, 9));
603 assert!(MvStore::visible_at(&v, 10));
604 assert!(MvStore::visible_at(&v, 1_000));
605
606 // Committed begin + committed end — visible iff begin <= T < end.
607 let v = RowVersion {
608 begin: TxTimestampOrId::Timestamp(10),
609 end: Some(TxTimestampOrId::Timestamp(20)),
610 payload: payload(0),
611 };
612 assert!(!MvStore::visible_at(&v, 9));
613 assert!(MvStore::visible_at(&v, 10));
614 assert!(MvStore::visible_at(&v, 19));
615 assert!(!MvStore::visible_at(&v, 20));
616
617 // In-flight begin — invisible to outside readers regardless
618 // of `end`.
619 let v = RowVersion {
620 begin: TxTimestampOrId::Id(super::super::TxId(42)),
621 end: None,
622 payload: payload(0),
623 };
624 assert!(!MvStore::visible_at(&v, 0));
625 assert!(!MvStore::visible_at(&v, 1_000));
626
627 // In-flight cap on an otherwise-visible version — still
628 // visible (the supersession isn't committed yet).
629 let v = RowVersion {
630 begin: TxTimestampOrId::Timestamp(5),
631 end: Some(TxTimestampOrId::Id(super::super::TxId(42))),
632 payload: payload(0),
633 };
634 assert!(MvStore::visible_at(&v, 10));
635 assert!(!MvStore::visible_at(&v, 4)); // begin > T
636 }
637
638 /// Tombstone semantics: deleting the row creates a Tombstone
639 /// version. Readers older than the delete still see the value
640 /// from the previous version; readers at or after the delete
641 /// see "no row" (the tombstone payload).
642 #[test]
643 fn tombstone_versions_capture_the_delete() {
644 let (store, _clock) = MvStore::fresh();
645 let row = RowID::new("t", 1);
646 store
647 .push_committed(row.clone(), RowVersion::committed(1, payload(42)))
648 .unwrap();
649 store
650 .push_committed(
651 row.clone(),
652 RowVersion::committed(5, VersionPayload::Tombstone),
653 )
654 .unwrap();
655
656 assert_eq!(store.read(&row, 1), Some(payload(42)));
657 assert_eq!(store.read(&row, 4), Some(payload(42)));
658 assert_eq!(store.read(&row, 5), Some(VersionPayload::Tombstone));
659 assert_eq!(store.read(&row, 100), Some(VersionPayload::Tombstone));
660 }
661
662 #[test]
663 fn push_committed_rejects_in_flight_begin() {
664 let (store, _clock) = MvStore::fresh();
665 let v = RowVersion::in_flight(super::super::TxId(7), payload(0));
666 let err = store
667 .push_committed(RowID::new("t", 1), v)
668 .expect_err("in-flight begin must be rejected");
669 assert_eq!(err, MvStoreError::NotCommitted);
670 }
671
672 #[test]
673 fn push_committed_rejects_non_monotonic_begin() {
674 let (store, _clock) = MvStore::fresh();
675 let row = RowID::new("t", 1);
676 store
677 .push_committed(row.clone(), RowVersion::committed(10, payload(1)))
678 .unwrap();
679 let err = store
680 .push_committed(row.clone(), RowVersion::committed(10, payload(2)))
681 .expect_err("equal begin should be rejected");
682 assert!(matches!(err, MvStoreError::NonMonotonicBegin { .. }));
683 let err = store
684 .push_committed(row.clone(), RowVersion::committed(5, payload(2)))
685 .expect_err("backward begin should be rejected");
686 assert!(matches!(err, MvStoreError::NonMonotonicBegin { .. }));
687 }
688
689 /// In-flight versions don't appear to other readers — the
690 /// snapshot-isolation contract Phase 11.4 relies on. Other
691 /// readers see the previously-committed version (or None if
692 /// the chain is empty otherwise).
693 #[test]
694 fn in_flight_versions_are_invisible_to_other_readers() {
695 let (store, _clock) = MvStore::fresh();
696 let row = RowID::new("t", 1);
697 store
698 .push_committed(row.clone(), RowVersion::committed(5, payload(100)))
699 .unwrap();
700 // Simulate an in-flight write at a higher (uncommitted)
701 // timestamp via a fresh TxId. Reader at any begin_ts must
702 // still see V1.
703 store.push_in_flight(
704 row.clone(),
705 RowVersion::in_flight(super::super::TxId(99), payload(200)),
706 );
707 assert_eq!(store.read(&row, 5), Some(payload(100)));
708 assert_eq!(store.read(&row, 1_000), Some(payload(100)));
709 }
710
711 /// Tracked-row + version counters reflect the chain shape.
712 /// Cheap sanity test that 11.6's GC will rely on once it lands.
713 #[test]
714 fn tracked_rows_and_total_versions_are_accurate() {
715 let (store, _clock) = MvStore::fresh();
716 store
717 .push_committed(RowID::new("a", 1), RowVersion::committed(1, payload(0)))
718 .unwrap();
719 store
720 .push_committed(RowID::new("a", 1), RowVersion::committed(2, payload(0)))
721 .unwrap();
722 store
723 .push_committed(RowID::new("a", 2), RowVersion::committed(1, payload(0)))
724 .unwrap();
725 store
726 .push_committed(RowID::new("b", 1), RowVersion::committed(1, payload(0)))
727 .unwrap();
728 assert_eq!(store.tracked_rows(), 3);
729 assert_eq!(store.total_versions(), 4);
730 }
731
732 #[test]
733 fn store_is_send_and_sync() {
734 fn assert_send<T: Send>() {}
735 fn assert_sync<T: Sync>() {}
736 assert_send::<MvStore>();
737 assert_sync::<MvStore>();
738 }
739
740 /// Concurrent readers walking different chains must not block
741 /// each other — that's the reason for the per-chain `RwLock`
742 /// rather than one big `Mutex<HashMap>`. Smoke test: many
743 /// threads read concurrently and must all see the right
744 /// versions.
745 #[test]
746 fn concurrent_reads_see_consistent_snapshots() {
747 use std::thread;
748
749 let (store, _clock) = MvStore::fresh();
750 for rid in 0..32 {
751 let row = RowID::new("t", rid);
752 store
753 .push_committed(row.clone(), RowVersion::committed(1, payload(rid)))
754 .unwrap();
755 store
756 .push_committed(row, RowVersion::committed(10, payload(rid * 100)))
757 .unwrap();
758 }
759
760 let store_arc = Arc::new(store);
761 let handles: Vec<_> = (0..8)
762 .map(|_| {
763 let s = Arc::clone(&store_arc);
764 thread::spawn(move || {
765 for _ in 0..500 {
766 for rid in 0..32 {
767 let row = RowID::new("t", rid);
768 // Pre-supersession: V1 visible.
769 assert_eq!(s.read(&row, 5), Some(payload(rid)));
770 // Post-supersession: V2 visible.
771 assert_eq!(s.read(&row, 100), Some(payload(rid * 100)));
772 }
773 }
774 })
775 })
776 .collect();
777
778 for h in handles {
779 h.join().unwrap();
780 }
781 }
782
783 /// The store's clock is the same `Arc` callers handed in — a
784 /// later 11.3 wiring change in `Database` relies on this.
785 #[test]
786 fn store_shares_caller_clock() {
787 let clock = Arc::new(MvccClock::new(42));
788 let store = MvStore::new(Arc::clone(&clock));
789 assert_eq!(store.clock().now(), 42);
790 clock.tick(); // clock.tick now == 43
791 assert_eq!(store.clock().now(), 43);
792 }
793
794 // -----------------------------------------------------------------
795 // Phase 11.6 — garbage collection
796 // -----------------------------------------------------------------
797
798 /// With no active readers, the watermark is `u64::MAX` —
799 /// every committed-and-superseded version is reclaimable.
800 /// Latest version (end = None) and in-flight versions stay.
801 #[test]
802 fn active_watermark_is_max_with_no_readers() {
803 let (store, _clock) = MvStore::fresh();
804 assert_eq!(store.active_watermark(), u64::MAX);
805 }
806
807 /// With at least one in-flight transaction, the watermark
808 /// drops to that tx's `begin_ts` (the smallest one across
809 /// the active set).
810 #[test]
811 fn active_watermark_tracks_oldest_in_flight_tx() {
812 let (store, clock) = MvStore::fresh();
813 let h1 = store.active_registry().register(&clock); // begin_ts = 1
814 assert_eq!(store.active_watermark(), 1);
815 let h2 = store.active_registry().register(&clock); // begin_ts = 2
816 assert_eq!(store.active_watermark(), 1);
817 drop(h1);
818 assert_eq!(store.active_watermark(), 2);
819 drop(h2);
820 assert_eq!(store.active_watermark(), u64::MAX);
821 }
822
823 /// `gc_chain` reclaims versions whose `end` timestamp is at or
824 /// below the watermark. The latest (end = None) stays, by
825 /// definition.
826 #[test]
827 fn gc_chain_reclaims_versions_below_watermark() {
828 let (store, _clock) = MvStore::fresh();
829 let row = RowID::new("t", 1);
830 // Build a chain of three committed versions.
831 store
832 .push_committed(row.clone(), RowVersion::committed(1, payload(1)))
833 .unwrap();
834 store
835 .push_committed(row.clone(), RowVersion::committed(5, payload(2)))
836 .unwrap();
837 store
838 .push_committed(row.clone(), RowVersion::committed(9, payload(3)))
839 .unwrap();
840 // After the third push, chain is:
841 // v1 begin=1 end=5
842 // v2 begin=5 end=9
843 // v3 begin=9 end=None
844 assert_eq!(store.total_versions(), 3);
845
846 // Watermark = 5 reclaims v1 (end=5 <= 5). v2 (end=9) is
847 // still possibly visible. v3 (end=None) always stays.
848 let reclaimed = store.gc_chain(&row, 5);
849 assert_eq!(reclaimed, 1);
850 assert_eq!(store.total_versions(), 2);
851
852 // Watermark = MAX reclaims everything that's been
853 // superseded; v3 (end=None) stays.
854 let reclaimed = store.gc_chain(&row, u64::MAX);
855 assert_eq!(reclaimed, 1);
856 assert_eq!(store.total_versions(), 1);
857 }
858
859 /// `gc_chain` drops the row's outer-map entry entirely when
860 /// the chain becomes empty. Cheap and prevents long-running
861 /// sessions from accumulating empty rows.
862 #[test]
863 fn gc_chain_drops_empty_chain_from_map() {
864 let (store, _clock) = MvStore::fresh();
865 let row = RowID::new("t", 1);
866 // Push two versions where the latest one will *also* be
867 // reclaimable: a tombstone capped (artificially) by a
868 // committed end so the whole chain becomes reclaimable.
869 // We can't actually reach this state through the public
870 // API in v0 (the latest version always has end=None), but
871 // the test exercises the empty-chain cleanup branch
872 // explicitly.
873 store
874 .push_committed(row.clone(), RowVersion::committed(1, payload(1)))
875 .unwrap();
876 store
877 .push_committed(
878 row.clone(),
879 RowVersion::committed(5, VersionPayload::Tombstone),
880 )
881 .unwrap();
882 // Forcibly cap the tombstone's end so the whole chain is
883 // reclaimable. (Reaches into the internals — fine for a
884 // unit test, the production path goes through `push_committed`
885 // which always leaves the latest end=None.)
886 {
887 let map = store.inner.versions.lock().unwrap();
888 let chain_arc = map.get(&row).unwrap().clone();
889 drop(map);
890 let mut chain = chain_arc.write().unwrap();
891 if let Some(last) = chain.last_mut() {
892 last.end = Some(TxTimestampOrId::Timestamp(10));
893 }
894 }
895 assert_eq!(store.tracked_rows(), 1);
896
897 let reclaimed = store.gc_chain(&row, u64::MAX);
898 assert_eq!(reclaimed, 2);
899 // Empty chain → row removed from outer map.
900 assert_eq!(store.tracked_rows(), 0);
901 }
902
903 /// `gc_all` sweeps every row's chain in one pass. Returns
904 /// the total versions reclaimed across the store.
905 #[test]
906 fn gc_all_sweeps_every_row() {
907 let (store, _clock) = MvStore::fresh();
908 for rid in 0..4 {
909 let row = RowID::new("t", rid);
910 store
911 .push_committed(row.clone(), RowVersion::committed(1, payload(rid)))
912 .unwrap();
913 store
914 .push_committed(row.clone(), RowVersion::committed(2, payload(rid * 10)))
915 .unwrap();
916 }
917 // Each row has 2 versions: one with end=2 (reclaimable
918 // at high watermark), one with end=None (kept).
919 assert_eq!(store.total_versions(), 8);
920
921 let reclaimed = store.gc_all(u64::MAX);
922 assert_eq!(reclaimed, 4);
923 assert_eq!(store.total_versions(), 4);
924 assert_eq!(store.tracked_rows(), 4);
925 }
926
927 /// GC must not reclaim versions visible to an active reader.
928 /// A reader with `begin_ts = 5` can see versions where
929 /// `begin <= 5 < end`, so versions with `end > 5` must be
930 /// kept.
931 #[test]
932 fn gc_preserves_versions_visible_to_active_readers() {
933 let (store, clock) = MvStore::fresh();
934 let row = RowID::new("t", 1);
935 store
936 .push_committed(row.clone(), RowVersion::committed(1, payload(1)))
937 .unwrap();
938 store
939 .push_committed(row.clone(), RowVersion::committed(10, payload(2)))
940 .unwrap();
941 // Open a reader at begin_ts = 5. (The clock is at 1 right
942 // now from the registers above; bump it past 5 then take
943 // a snapshot via observe.)
944 clock.observe(4);
945 let reader = store.active_registry().register(&clock); // begin_ts = 5
946 assert_eq!(reader.begin_ts(), 5);
947 assert_eq!(store.active_watermark(), 5);
948
949 // The version with end = 10 is still visible to the
950 // reader — must NOT be reclaimed.
951 let reclaimed = store.gc_chain(&row, store.active_watermark());
952 assert_eq!(reclaimed, 0);
953
954 // Reader at begin_ts = 5 sees v1 (begin=1, end=10
955 // satisfies 1 <= 5 < 10).
956 assert_eq!(store.read(&row, 5), Some(payload(1)));
957
958 // Once the reader closes, the watermark jumps to MAX and
959 // v1 becomes reclaimable.
960 drop(reader);
961 let reclaimed = store.gc_chain(&row, store.active_watermark());
962 assert_eq!(reclaimed, 1);
963 }
964
965 /// Many sequential commits to the same row with no active
966 /// reader: chain length stays bounded under per-row GC.
967 /// (`bounded` here = "1 latest version" — every prior version
968 /// is reclaimable because nobody can see them.)
969 #[test]
970 fn gc_keeps_chain_bounded_under_repeated_updates() {
971 let (store, _clock) = MvStore::fresh();
972 let row = RowID::new("t", 1);
973 for ts in 1..=100u64 {
974 store
975 .push_committed(row.clone(), RowVersion::committed(ts, payload(ts as i64)))
976 .unwrap();
977 // Per-update GC sweep at the current watermark
978 // (u64::MAX since no readers).
979 store.gc_chain(&row, store.active_watermark());
980 }
981 // Only the latest version (end = None) survives.
982 assert_eq!(store.total_versions(), 1);
983 }
984}