sqlrite-engine 0.10.0

Light version of SQLite developed with Rust. Published as `sqlrite-engine` on crates.io; import as `use sqlrite::…`.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
//! [`MvStore`] — the in-memory version index sitting in front of
//! the pager (Phase 11.3 skeleton).
//!
//! Per [`docs/concurrent-writes-plan.md`](../../../docs/concurrent-writes-plan.md):
//!
//! > The MVCC store keeps an in-memory map keyed by `RowID
//! > { table_id, row_key }` whose value is a chain of `RowVersion`
//! > records. Each version carries `begin`/`end` timestamps and the
//! > row payload itself. Visibility for a reader transaction with
//! > begin-timestamp `T` is the textbook snapshot-isolation rule:
//! > pick the version whose `begin <= T < end`.
//!
//! Phase 11.3 lands the standalone data structures + visibility
//! logic so 11.4 can plug them into:
//!
//! - the **executor's read path** when the connection is in MVCC
//!   journal mode (the [`super::JournalMode`] enum);
//! - the **commit path**, which mirrors successful writes from the
//!   legacy `Database::tables` map into the MvStore at the assigned
//!   `commit_ts` and ends the previous latest version at the same
//!   timestamp.
//!
//! Today nothing in the executor calls into this module. The
//! `PRAGMA journal_mode = mvcc` switch parses but doesn't change
//! query behaviour. That's intentional — committing to a half-wired
//! read path before the write side exists would force 11.4's
//! commit-validation work into this PR. The two are coupled and
//! ship together.
//!
//! ## Why one big mutex per chain rather than a per-row lock
//!
//! v0 stores each row's version chain inside an
//! `Arc<RwLock<Vec<RowVersion>>>`. The outer map is a
//! `Mutex<HashMap<RowID, _>>`. Two reasons not to over-engineer:
//!
//! 1. The plan-doc explicitly calls this out:
//!    > One chain per row, behind `RwLock` (or `parking_lot::RwLock`).
//!    > The wait-free chain is a known follow-up; it's not on the v0
//!    > critical path.
//! 2. The hot path is `MvStore::read`, which takes the outer lock to
//!    fetch the `Arc<RwLock<…>>`, drops it, then takes the chain's
//!    `RwLock` in read mode for the visibility scan. The outer lock
//!    is held only long enough to clone an `Arc`.
//!
//! When the commit path lands (11.4) and we observe contention, a
//! sharded outer map (e.g. `dashmap`) becomes the obvious upgrade —
//! same `RowID → chain` shape, just multiple shards. None of
//! `MvStore`'s public surface assumes the inner storage shape, so
//! the swap is local.

use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};

use crate::sql::db::table::Value;

use super::clock::MvccClock;
use super::registry::{ActiveTxRegistry, TxTimestampOrId};

/// Identifies a row across the MvStore. v0 keys by table name +
/// rowid because the engine doesn't yet have a stable numeric
/// `table_id` (the schema catalog is keyed by name). When 11.5
/// lands a numeric table id (likely as part of the checkpoint
/// integration so the index doesn't carry a `String` per row),
/// flip this to `(u32, i64)` — every consumer of `RowID` only
/// uses it for hashing / equality, so the rename is local.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RowID {
    pub table: String,
    pub rowid: i64,
}

impl RowID {
    pub fn new(table: impl Into<String>, rowid: i64) -> Self {
        Self {
            table: table.into(),
            rowid,
        }
    }
}

/// What a [`RowVersion`] records. `Present` carries the row's
/// column values at the moment of commit; `Tombstone` records that
/// the row was deleted at this version's `begin` timestamp.
///
/// Storing column-value pairs as a `Vec<(String, Value)>` rather
/// than `BTreeMap<String, Value>` because:
/// - The vector preserves declaration order (stable for tests +
///   diagnostics).
/// - Lookups by column are rare on this path — the executor walks
///   the row by projection order.
#[derive(Debug, Clone, PartialEq)]
pub enum VersionPayload {
    /// Row exists with the given column-value pairs.
    Present(Vec<(String, Value)>),
    /// Row was deleted at this version's `begin` timestamp. Visible
    /// readers see "no such row"; readers older than `begin` still
    /// see whatever the previous version held.
    Tombstone,
}

/// One link in a row's version chain.
///
/// Visibility under snapshot isolation is the textbook rule the
/// Hekaton paper formalises and Turso's MVCC implements:
///
/// - `begin <= T`: the version was committed at or before the
///   reader's begin-timestamp. (For an in-flight version
///   `begin = Id(tx)`, only the producing transaction can see it.)
/// - `end > T` or `end is None`: the version hasn't been superseded
///   yet from the reader's point of view.
///
/// Both conditions must hold. See [`MvStore::visible_at`].
#[derive(Debug, Clone)]
pub struct RowVersion {
    pub begin: TxTimestampOrId,
    pub end: Option<TxTimestampOrId>,
    pub payload: VersionPayload,
}

impl RowVersion {
    /// Builds a freshly-committed version at `commit_ts` with no
    /// `end` (i.e. currently latest). This is the shape the legacy
    /// commit path produces in 11.4 when it mirrors a row write.
    pub fn committed(commit_ts: u64, payload: VersionPayload) -> Self {
        Self {
            begin: TxTimestampOrId::Timestamp(commit_ts),
            end: None,
            payload,
        }
    }

    /// Builds an in-flight version owned by `tx_id`. v0 tests use
    /// this to construct chains by hand; the production write path
    /// (11.4) will own it.
    pub fn in_flight(tx_id: super::TxId, payload: VersionPayload) -> Self {
        Self {
            begin: TxTimestampOrId::Id(tx_id),
            end: None,
            payload,
        }
    }
}

/// A row's version chain. Newest version at the back — easy
/// `push_version` semantics; reads scan from the back since that's
/// where most queries' `begin_ts` lands.
pub type RowVersionChain = Vec<RowVersion>;

/// In-memory MVCC version index. Cheap to clone — the heavy state
/// is behind `Arc`s.
#[derive(Clone, Debug)]
pub struct MvStore {
    inner: Arc<MvStoreInner>,
}

#[derive(Debug)]
struct MvStoreInner {
    /// `RowID → version chain`. Outer `Mutex` guards the map's
    /// shape (insert / lookup); the per-chain `RwLock` guards the
    /// `Vec` (so two readers walking different chains don't fight,
    /// and the writer that ends the latest version doesn't block
    /// readers on other chains).
    versions: Mutex<HashMap<RowID, Arc<RwLock<RowVersionChain>>>>,
    clock: Arc<MvccClock>,
    active: ActiveTxRegistry,
}

impl MvStore {
    /// Builds an empty store wired to a shared clock + registry.
    /// Phase 11.3 wires this into `Database` so every connection
    /// observes the same version index; 11.2's `Wal::clock_high_water`
    /// seeds the clock at open time.
    pub fn new(clock: Arc<MvccClock>) -> Self {
        Self {
            inner: Arc::new(MvStoreInner {
                versions: Mutex::new(HashMap::new()),
                clock,
                active: ActiveTxRegistry::new(),
            }),
        }
    }

    /// Convenience for tests + standalone callers — builds a store
    /// over a freshly-allocated clock seeded at 0. The clock is
    /// returned so the caller can `tick()` it to allocate
    /// timestamps for hand-built versions.
    pub fn fresh() -> (Self, Arc<MvccClock>) {
        let clock = Arc::new(MvccClock::new(0));
        let store = Self::new(Arc::clone(&clock));
        (store, clock)
    }

    /// Returns the shared clock. The same `Arc` every consumer
    /// (commit path, read path, GC) holds.
    pub fn clock(&self) -> &Arc<MvccClock> {
        &self.inner.clock
    }

    /// Returns the active-transaction registry. Phase 11.4 will
    /// register `BEGIN CONCURRENT` transactions here; Phase 11.6
    /// reads `min_active_begin_ts()` to set the GC watermark.
    pub fn active_registry(&self) -> &ActiveTxRegistry {
        &self.inner.active
    }

    /// Number of rows the store holds at least one version for.
    /// Cheap diagnostic — locks only the outer map briefly.
    pub fn tracked_rows(&self) -> usize {
        self.lock_map().len()
    }

    /// Total versions across every chain. Linear in row count;
    /// intended for tests + assertions, not the hot path.
    pub fn total_versions(&self) -> usize {
        let map = self.lock_map();
        map.values()
            .map(|chain| chain.read().expect("chain RwLock poisoned").len())
            .sum()
    }

    /// Returns the version of `row_id` that's visible to a reader
    /// transaction whose begin-timestamp is `begin_ts`, or `None`
    /// if no version satisfies the snapshot-isolation rule.
    ///
    /// Snapshot-isolation visibility:
    /// - the version's `begin` is a committed timestamp `<= begin_ts`,
    ///   and
    /// - the version's `end` is `None` (still latest) or a committed
    ///   timestamp `> begin_ts`.
    ///
    /// In-flight versions (`begin = Id(_)`) are never visible to
    /// other readers — they're a placeholder until the producing
    /// transaction either commits (the version's `begin` is rewritten
    /// to a `Timestamp`) or aborts (the version is dropped). The
    /// producing transaction itself reads its own writes through a
    /// separate path (Phase 11.4); it doesn't go through this
    /// function.
    ///
    /// The chain is scanned **front to back**: in v0 we don't trust
    /// any insertion order, so the loop must not exit early. When
    /// the chain becomes ordered-by-`begin` (a natural property of
    /// the commit path's append-only writes in 11.4), this can
    /// short-circuit on the first visible version.
    pub fn read(&self, row_id: &RowID, begin_ts: u64) -> Option<VersionPayload> {
        let chain = {
            let map = self.lock_map();
            Arc::clone(map.get(row_id)?)
        };
        let chain = chain.read().expect("chain RwLock poisoned");
        for v in chain.iter() {
            if Self::visible_at(v, begin_ts) {
                return Some(v.payload.clone());
            }
        }
        None
    }

    /// Returns true if `version` is visible to a reader whose
    /// begin-timestamp is `begin_ts`. Pure function — exposed for
    /// tests + future GC code.
    pub fn visible_at(version: &RowVersion, begin_ts: u64) -> bool {
        // begin must be a committed timestamp <= begin_ts.
        let begin_ok = match version.begin {
            TxTimestampOrId::Timestamp(t) => t <= begin_ts,
            TxTimestampOrId::Id(_) => false,
        };
        if !begin_ok {
            return false;
        }
        // end must be None (still latest) OR a committed timestamp
        // strictly > begin_ts. An in-flight `Id(_)` cap means some
        // other transaction is in the process of superseding this
        // version but hasn't committed yet — from the reader's
        // perspective the version is still latest.
        match version.end {
            None => true,
            Some(TxTimestampOrId::Timestamp(t)) => t > begin_ts,
            Some(TxTimestampOrId::Id(_)) => true,
        }
    }

    /// Returns the begin-timestamp of the latest committed version
    /// in `row_id`'s chain, or `None` if the row has no committed
    /// versions (the chain is empty or only carries in-flight
    /// placeholders).
    ///
    /// Phase 11.4 — the commit-validation pass calls this for
    /// every row in its write-set. If the latest committed begin
    /// is greater than the validating transaction's `begin_ts`,
    /// some other transaction superseded the row after our
    /// snapshot — abort with [`crate::error::SQLRiteError::Busy`].
    pub fn latest_committed_begin(&self, row_id: &RowID) -> Option<u64> {
        let chain = {
            let map = self.lock_map();
            Arc::clone(map.get(row_id)?)
        };
        let chain = chain.read().expect("chain RwLock poisoned");
        // Walk back-to-front — the latest committed version is
        // typically the rightmost element. Skip in-flight versions
        // (`begin = Id(_)`) — they aren't published yet.
        chain.iter().rev().find_map(|v| match v.begin {
            TxTimestampOrId::Timestamp(t) => Some(t),
            TxTimestampOrId::Id(_) => None,
        })
    }

    /// Pushes a new version onto the chain for `row_id`. Caps the
    /// chain's previous latest version (if any) at `version.begin`
    /// — the canonical write-side bookkeeping the commit path will
    /// use in 11.4.
    ///
    /// `version.begin` must be a `Timestamp` (committed) — pushing
    /// an in-flight version through this entry point would break
    /// the cap rule. Use [`MvStore::push_in_flight`] for in-flight
    /// versions; commit will rewrite their `begin` later.
    ///
    /// Errors if the new `begin` is `<= the previous latest's
    /// begin` (violates monotonicity — the commit path must always
    /// hand out increasing timestamps via the `MvccClock`).
    pub fn push_committed(&self, row_id: RowID, version: RowVersion) -> Result<(), MvStoreError> {
        let begin_ts = match version.begin {
            TxTimestampOrId::Timestamp(t) => t,
            TxTimestampOrId::Id(_) => return Err(MvStoreError::NotCommitted),
        };
        let chain_arc = self.get_or_create_chain(row_id);
        let mut chain = chain_arc.write().expect("chain RwLock poisoned");
        if let Some(prev) = chain.last() {
            // Validate before mutating — a failed validation must
            // not leave the chain in a half-capped state. (Earlier
            // drafts mutated `prev.end` first, then ran these
            // checks; equal-begin retries then surfaced as
            // `PreviousAlreadyCapped` instead of the
            // `NonMonotonicBegin` callers expect.)
            let prev_begin = match prev.begin {
                TxTimestampOrId::Timestamp(t) => t,
                TxTimestampOrId::Id(_) => 0,
            };
            if begin_ts <= prev_begin {
                return Err(MvStoreError::NonMonotonicBegin {
                    prev: prev_begin,
                    new: begin_ts,
                });
            }
            match prev.end {
                None => {}
                Some(TxTimestampOrId::Timestamp(existing)) if existing == begin_ts => {
                    // Idempotent replay — already capped at exactly
                    // this timestamp (recovery path will hit this).
                }
                Some(TxTimestampOrId::Timestamp(existing)) => {
                    return Err(MvStoreError::PreviousAlreadyCapped { existing });
                }
                Some(TxTimestampOrId::Id(_)) => {
                    // An in-flight cap means another transaction
                    // owns the supersession; the commit path
                    // shouldn't hit this in 11.4 (validation runs
                    // first). v0 returns a typed error rather than
                    // silently overwriting.
                    return Err(MvStoreError::PreviousCappedByInFlight);
                }
            }
        }
        // Validation passed — apply the cap (if any) and push.
        if let Some(prev) = chain.last_mut() {
            if prev.end.is_none() {
                prev.end = Some(TxTimestampOrId::Timestamp(begin_ts));
            }
        }
        chain.push(version);
        Ok(())
    }

    /// Pushes an in-flight version onto the chain. Used by the
    /// 11.4 write path while a `BEGIN CONCURRENT` transaction is
    /// open; the version's `begin` is rewritten from `Id(tx)` to
    /// `Timestamp(commit_ts)` on commit, and the previous latest
    /// gets capped at the same timestamp (via [`Self::push_committed`]
    /// at commit time, after the in-flight version is removed).
    ///
    /// 11.3 ships this as standalone API for tests; 11.4 wires it
    /// into the executor.
    pub fn push_in_flight(&self, row_id: RowID, version: RowVersion) {
        let chain_arc = self.get_or_create_chain(row_id);
        let mut chain = chain_arc.write().expect("chain RwLock poisoned");
        chain.push(version);
    }

    // -----------------------------------------------------------------
    // Phase 11.6 — garbage collection
    // -----------------------------------------------------------------

    /// Returns the GC watermark — the timestamp below which any
    /// committed-and-superseded version is reclaimable.
    ///
    /// - If there's at least one in-flight transaction, the
    ///   watermark is its `begin_ts` (the smallest one across the
    ///   active set). Versions whose `end` timestamp is `> watermark`
    ///   may still be visible to that reader and must be kept.
    /// - With no in-flight transactions the watermark is `u64::MAX`,
    ///   meaning every superseded version can go (the latest version
    ///   per row stays — its `end` is `None`).
    ///
    /// The `+1` shift versus the strict snapshot-isolation
    /// reclamation rule keeps the math simple: `gc_chain` retains
    /// versions whose end-timestamp is strictly greater than the
    /// watermark, so `watermark = u64::MAX` reclaims every version
    /// with `end = Some(_)` cleanly.
    pub fn active_watermark(&self) -> u64 {
        self.inner.active.min_active_begin_ts().unwrap_or(u64::MAX)
    }

    /// Garbage-collects `row_id`'s version chain against
    /// `watermark`. A committed version is reclaimable when its
    /// `end` timestamp is `<= watermark` — at that point no
    /// reader's `begin_ts` falls in the half-open `[begin, end)`
    /// interval that the snapshot-isolation rule requires for
    /// visibility. In-flight versions and the latest committed
    /// version (`end == None`) are always kept.
    ///
    /// Returns the number of versions reclaimed. Drops the chain
    /// from the outer map entirely if it ends up empty (no
    /// versions left after the sweep), so the per-row entry
    /// doesn't leak memory.
    pub fn gc_chain(&self, row_id: &RowID, watermark: u64) -> usize {
        let chain_arc = match self.lock_map().get(row_id) {
            Some(arc) => Arc::clone(arc),
            None => return 0,
        };
        let reclaimed = {
            let mut chain = chain_arc.write().expect("chain RwLock poisoned");
            let before = chain.len();
            chain.retain(|v| match v.end {
                // Committed-end timestamp at or below the watermark
                // is reclaimable; anything strictly above it is
                // still possibly visible to a reader.
                Some(TxTimestampOrId::Timestamp(t)) => t > watermark,
                // None (latest version) and in-flight `Id(_)` caps
                // are always kept.
                _ => true,
            });
            before - chain.len()
        };
        // Drop the row's outer-map entry if its chain is now
        // empty. Cheap and avoids accumulating empty rows over
        // long-running sessions. Re-checks `is_empty` under both
        // locks so we don't race with a `push_committed` that's
        // about to add a new version.
        if reclaimed > 0 {
            let chain_locked = chain_arc.read().expect("chain RwLock poisoned");
            if chain_locked.is_empty() {
                drop(chain_locked);
                self.lock_map().remove(row_id);
            }
        }
        reclaimed
    }

    /// Sweeps every row in the store against `watermark`. Returns
    /// the total number of versions reclaimed. Used by
    /// [`crate::Connection::vacuum_mvcc`] for an explicit full
    /// drain; per-commit callers should prefer
    /// [`MvStore::gc_chain`] over the rows they actually touched.
    pub fn gc_all(&self, watermark: u64) -> usize {
        // Snapshot the row keys upfront so we don't hold the outer
        // map lock across the per-chain locks (avoids a long
        // critical section that would block concurrent
        // `push_committed` / `read` calls).
        let row_ids: Vec<RowID> = self.lock_map().keys().cloned().collect();
        row_ids
            .iter()
            .map(|rid| self.gc_chain(rid, watermark))
            .sum()
    }

    fn get_or_create_chain(&self, row_id: RowID) -> Arc<RwLock<RowVersionChain>> {
        let mut map = self.lock_map();
        Arc::clone(
            map.entry(row_id)
                .or_insert_with(|| Arc::new(RwLock::new(Vec::new()))),
        )
    }

    fn lock_map(&self) -> std::sync::MutexGuard<'_, HashMap<RowID, Arc<RwLock<RowVersionChain>>>> {
        self.inner
            .versions
            .lock()
            .unwrap_or_else(|e| panic!("sqlrite: MvStore versions mutex poisoned: {e}"))
    }
}

/// Errors returned by mutating MvStore operations. Read-side calls
/// (`read`, `visible_at`) don't error.
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum MvStoreError {
    /// `push_committed` got a version whose `begin` is an in-flight
    /// `TxId` rather than a committed `Timestamp`.
    #[error("push_committed expects a committed Timestamp, not an in-flight TxId")]
    NotCommitted,

    /// The previous latest version is already capped at a different
    /// timestamp. Either the caller is double-committing, or the
    /// commit path is racing with itself (which 11.4's commit-validation
    /// loop is supposed to prevent).
    #[error("previous latest version already capped at end_ts={existing}")]
    PreviousAlreadyCapped { existing: u64 },

    /// The previous latest's `end` is set to an in-flight cap. v0
    /// rejects rather than silently overwriting; 11.4's commit
    /// validation runs first so this shouldn't fire in production.
    #[error("previous latest version is being capped by an in-flight transaction")]
    PreviousCappedByInFlight,

    /// New version's `begin` is not strictly greater than the
    /// previous latest's `begin`. The clock should always hand out
    /// monotonically increasing timestamps; this is a corruption /
    /// bug indicator.
    #[error("non-monotonic begin: previous={prev}, new={new}")]
    NonMonotonicBegin { prev: u64, new: u64 },
}

#[cfg(test)]
mod tests {
    use super::*;

    fn payload(value: i64) -> VersionPayload {
        VersionPayload::Present(vec![("v".to_string(), Value::Integer(value))])
    }

    #[test]
    fn empty_store_returns_none() {
        let (store, _clock) = MvStore::fresh();
        assert!(store.read(&RowID::new("t", 1), 100).is_none());
        assert_eq!(store.tracked_rows(), 0);
        assert_eq!(store.total_versions(), 0);
    }

    /// Snapshot isolation visibility — the headline rule. One row
    /// gets two committed versions at different timestamps; readers
    /// at varying `begin_ts` see exactly the version that satisfies
    /// `begin <= T < end`.
    #[test]
    fn visibility_picks_the_right_version_for_each_begin_ts() {
        let (store, clock) = MvStore::fresh();
        let row = RowID::new("accounts", 1);

        // V1 committed at ts=5, V2 committed at ts=10.
        clock.observe(5);
        store
            .push_committed(row.clone(), RowVersion::committed(5, payload(100)))
            .unwrap();
        clock.observe(10);
        store
            .push_committed(row.clone(), RowVersion::committed(10, payload(200)))
            .unwrap();

        // Reader before V1 — nothing visible.
        assert_eq!(store.read(&row, 4), None);

        // Reader at exactly V1's begin — sees V1.
        assert_eq!(store.read(&row, 5), Some(payload(100)));

        // Reader between V1 and V2 — still sees V1 (V2's begin > T).
        assert_eq!(store.read(&row, 9), Some(payload(100)));

        // Reader at exactly V2's begin — sees V2.
        assert_eq!(store.read(&row, 10), Some(payload(200)));

        // Reader past V2 — sees V2.
        assert_eq!(store.read(&row, 1_000), Some(payload(200)));
    }

    /// `push_committed` caps the previous latest version's `end` at
    /// the new version's `begin`. Without this, every version's
    /// `end` would stay None and the visibility rule would return
    /// the oldest committed version for every reader.
    #[test]
    fn push_committed_caps_previous_latest() {
        let (store, _clock) = MvStore::fresh();
        let row = RowID::new("t", 7);
        store
            .push_committed(row.clone(), RowVersion::committed(2, payload(1)))
            .unwrap();
        store
            .push_committed(row.clone(), RowVersion::committed(5, payload(2)))
            .unwrap();
        // Inspect the chain through the public API. A reader at
        // exactly ts=4 should see V1 — that's only correct if V1's
        // end was set to Some(Timestamp(5)).
        assert_eq!(store.read(&row, 4), Some(payload(1)));
    }

    /// The visibility helper is pure; test it independently of
    /// the chain to lock down the rule.
    #[test]
    fn visible_at_handles_each_combination() {
        // Committed begin, no end — visible iff T >= begin.
        let v = RowVersion {
            begin: TxTimestampOrId::Timestamp(10),
            end: None,
            payload: payload(0),
        };
        assert!(!MvStore::visible_at(&v, 9));
        assert!(MvStore::visible_at(&v, 10));
        assert!(MvStore::visible_at(&v, 1_000));

        // Committed begin + committed end — visible iff begin <= T < end.
        let v = RowVersion {
            begin: TxTimestampOrId::Timestamp(10),
            end: Some(TxTimestampOrId::Timestamp(20)),
            payload: payload(0),
        };
        assert!(!MvStore::visible_at(&v, 9));
        assert!(MvStore::visible_at(&v, 10));
        assert!(MvStore::visible_at(&v, 19));
        assert!(!MvStore::visible_at(&v, 20));

        // In-flight begin — invisible to outside readers regardless
        // of `end`.
        let v = RowVersion {
            begin: TxTimestampOrId::Id(super::super::TxId(42)),
            end: None,
            payload: payload(0),
        };
        assert!(!MvStore::visible_at(&v, 0));
        assert!(!MvStore::visible_at(&v, 1_000));

        // In-flight cap on an otherwise-visible version — still
        // visible (the supersession isn't committed yet).
        let v = RowVersion {
            begin: TxTimestampOrId::Timestamp(5),
            end: Some(TxTimestampOrId::Id(super::super::TxId(42))),
            payload: payload(0),
        };
        assert!(MvStore::visible_at(&v, 10));
        assert!(!MvStore::visible_at(&v, 4)); // begin > T
    }

    /// Tombstone semantics: deleting the row creates a Tombstone
    /// version. Readers older than the delete still see the value
    /// from the previous version; readers at or after the delete
    /// see "no row" (the tombstone payload).
    #[test]
    fn tombstone_versions_capture_the_delete() {
        let (store, _clock) = MvStore::fresh();
        let row = RowID::new("t", 1);
        store
            .push_committed(row.clone(), RowVersion::committed(1, payload(42)))
            .unwrap();
        store
            .push_committed(
                row.clone(),
                RowVersion::committed(5, VersionPayload::Tombstone),
            )
            .unwrap();

        assert_eq!(store.read(&row, 1), Some(payload(42)));
        assert_eq!(store.read(&row, 4), Some(payload(42)));
        assert_eq!(store.read(&row, 5), Some(VersionPayload::Tombstone));
        assert_eq!(store.read(&row, 100), Some(VersionPayload::Tombstone));
    }

    #[test]
    fn push_committed_rejects_in_flight_begin() {
        let (store, _clock) = MvStore::fresh();
        let v = RowVersion::in_flight(super::super::TxId(7), payload(0));
        let err = store
            .push_committed(RowID::new("t", 1), v)
            .expect_err("in-flight begin must be rejected");
        assert_eq!(err, MvStoreError::NotCommitted);
    }

    #[test]
    fn push_committed_rejects_non_monotonic_begin() {
        let (store, _clock) = MvStore::fresh();
        let row = RowID::new("t", 1);
        store
            .push_committed(row.clone(), RowVersion::committed(10, payload(1)))
            .unwrap();
        let err = store
            .push_committed(row.clone(), RowVersion::committed(10, payload(2)))
            .expect_err("equal begin should be rejected");
        assert!(matches!(err, MvStoreError::NonMonotonicBegin { .. }));
        let err = store
            .push_committed(row.clone(), RowVersion::committed(5, payload(2)))
            .expect_err("backward begin should be rejected");
        assert!(matches!(err, MvStoreError::NonMonotonicBegin { .. }));
    }

    /// In-flight versions don't appear to other readers — the
    /// snapshot-isolation contract Phase 11.4 relies on. Other
    /// readers see the previously-committed version (or None if
    /// the chain is empty otherwise).
    #[test]
    fn in_flight_versions_are_invisible_to_other_readers() {
        let (store, _clock) = MvStore::fresh();
        let row = RowID::new("t", 1);
        store
            .push_committed(row.clone(), RowVersion::committed(5, payload(100)))
            .unwrap();
        // Simulate an in-flight write at a higher (uncommitted)
        // timestamp via a fresh TxId. Reader at any begin_ts must
        // still see V1.
        store.push_in_flight(
            row.clone(),
            RowVersion::in_flight(super::super::TxId(99), payload(200)),
        );
        assert_eq!(store.read(&row, 5), Some(payload(100)));
        assert_eq!(store.read(&row, 1_000), Some(payload(100)));
    }

    /// Tracked-row + version counters reflect the chain shape.
    /// Cheap sanity test that 11.6's GC will rely on once it lands.
    #[test]
    fn tracked_rows_and_total_versions_are_accurate() {
        let (store, _clock) = MvStore::fresh();
        store
            .push_committed(RowID::new("a", 1), RowVersion::committed(1, payload(0)))
            .unwrap();
        store
            .push_committed(RowID::new("a", 1), RowVersion::committed(2, payload(0)))
            .unwrap();
        store
            .push_committed(RowID::new("a", 2), RowVersion::committed(1, payload(0)))
            .unwrap();
        store
            .push_committed(RowID::new("b", 1), RowVersion::committed(1, payload(0)))
            .unwrap();
        assert_eq!(store.tracked_rows(), 3);
        assert_eq!(store.total_versions(), 4);
    }

    #[test]
    fn store_is_send_and_sync() {
        fn assert_send<T: Send>() {}
        fn assert_sync<T: Sync>() {}
        assert_send::<MvStore>();
        assert_sync::<MvStore>();
    }

    /// Concurrent readers walking different chains must not block
    /// each other — that's the reason for the per-chain `RwLock`
    /// rather than one big `Mutex<HashMap>`. Smoke test: many
    /// threads read concurrently and must all see the right
    /// versions.
    #[test]
    fn concurrent_reads_see_consistent_snapshots() {
        use std::thread;

        let (store, _clock) = MvStore::fresh();
        for rid in 0..32 {
            let row = RowID::new("t", rid);
            store
                .push_committed(row.clone(), RowVersion::committed(1, payload(rid)))
                .unwrap();
            store
                .push_committed(row, RowVersion::committed(10, payload(rid * 100)))
                .unwrap();
        }

        let store_arc = Arc::new(store);
        let handles: Vec<_> = (0..8)
            .map(|_| {
                let s = Arc::clone(&store_arc);
                thread::spawn(move || {
                    for _ in 0..500 {
                        for rid in 0..32 {
                            let row = RowID::new("t", rid);
                            // Pre-supersession: V1 visible.
                            assert_eq!(s.read(&row, 5), Some(payload(rid)));
                            // Post-supersession: V2 visible.
                            assert_eq!(s.read(&row, 100), Some(payload(rid * 100)));
                        }
                    }
                })
            })
            .collect();

        for h in handles {
            h.join().unwrap();
        }
    }

    /// The store's clock is the same `Arc` callers handed in — a
    /// later 11.3 wiring change in `Database` relies on this.
    #[test]
    fn store_shares_caller_clock() {
        let clock = Arc::new(MvccClock::new(42));
        let store = MvStore::new(Arc::clone(&clock));
        assert_eq!(store.clock().now(), 42);
        clock.tick(); // clock.tick now == 43
        assert_eq!(store.clock().now(), 43);
    }

    // -----------------------------------------------------------------
    // Phase 11.6 — garbage collection
    // -----------------------------------------------------------------

    /// With no active readers, the watermark is `u64::MAX` —
    /// every committed-and-superseded version is reclaimable.
    /// Latest version (end = None) and in-flight versions stay.
    #[test]
    fn active_watermark_is_max_with_no_readers() {
        let (store, _clock) = MvStore::fresh();
        assert_eq!(store.active_watermark(), u64::MAX);
    }

    /// With at least one in-flight transaction, the watermark
    /// drops to that tx's `begin_ts` (the smallest one across
    /// the active set).
    #[test]
    fn active_watermark_tracks_oldest_in_flight_tx() {
        let (store, clock) = MvStore::fresh();
        let h1 = store.active_registry().register(&clock); // begin_ts = 1
        assert_eq!(store.active_watermark(), 1);
        let h2 = store.active_registry().register(&clock); // begin_ts = 2
        assert_eq!(store.active_watermark(), 1);
        drop(h1);
        assert_eq!(store.active_watermark(), 2);
        drop(h2);
        assert_eq!(store.active_watermark(), u64::MAX);
    }

    /// `gc_chain` reclaims versions whose `end` timestamp is at or
    /// below the watermark. The latest (end = None) stays, by
    /// definition.
    #[test]
    fn gc_chain_reclaims_versions_below_watermark() {
        let (store, _clock) = MvStore::fresh();
        let row = RowID::new("t", 1);
        // Build a chain of three committed versions.
        store
            .push_committed(row.clone(), RowVersion::committed(1, payload(1)))
            .unwrap();
        store
            .push_committed(row.clone(), RowVersion::committed(5, payload(2)))
            .unwrap();
        store
            .push_committed(row.clone(), RowVersion::committed(9, payload(3)))
            .unwrap();
        // After the third push, chain is:
        //   v1 begin=1 end=5
        //   v2 begin=5 end=9
        //   v3 begin=9 end=None
        assert_eq!(store.total_versions(), 3);

        // Watermark = 5 reclaims v1 (end=5 <= 5). v2 (end=9) is
        // still possibly visible. v3 (end=None) always stays.
        let reclaimed = store.gc_chain(&row, 5);
        assert_eq!(reclaimed, 1);
        assert_eq!(store.total_versions(), 2);

        // Watermark = MAX reclaims everything that's been
        // superseded; v3 (end=None) stays.
        let reclaimed = store.gc_chain(&row, u64::MAX);
        assert_eq!(reclaimed, 1);
        assert_eq!(store.total_versions(), 1);
    }

    /// `gc_chain` drops the row's outer-map entry entirely when
    /// the chain becomes empty. Cheap and prevents long-running
    /// sessions from accumulating empty rows.
    #[test]
    fn gc_chain_drops_empty_chain_from_map() {
        let (store, _clock) = MvStore::fresh();
        let row = RowID::new("t", 1);
        // Push two versions where the latest one will *also* be
        // reclaimable: a tombstone capped (artificially) by a
        // committed end so the whole chain becomes reclaimable.
        // We can't actually reach this state through the public
        // API in v0 (the latest version always has end=None), but
        // the test exercises the empty-chain cleanup branch
        // explicitly.
        store
            .push_committed(row.clone(), RowVersion::committed(1, payload(1)))
            .unwrap();
        store
            .push_committed(
                row.clone(),
                RowVersion::committed(5, VersionPayload::Tombstone),
            )
            .unwrap();
        // Forcibly cap the tombstone's end so the whole chain is
        // reclaimable. (Reaches into the internals — fine for a
        // unit test, the production path goes through `push_committed`
        // which always leaves the latest end=None.)
        {
            let map = store.inner.versions.lock().unwrap();
            let chain_arc = map.get(&row).unwrap().clone();
            drop(map);
            let mut chain = chain_arc.write().unwrap();
            if let Some(last) = chain.last_mut() {
                last.end = Some(TxTimestampOrId::Timestamp(10));
            }
        }
        assert_eq!(store.tracked_rows(), 1);

        let reclaimed = store.gc_chain(&row, u64::MAX);
        assert_eq!(reclaimed, 2);
        // Empty chain → row removed from outer map.
        assert_eq!(store.tracked_rows(), 0);
    }

    /// `gc_all` sweeps every row's chain in one pass. Returns
    /// the total versions reclaimed across the store.
    #[test]
    fn gc_all_sweeps_every_row() {
        let (store, _clock) = MvStore::fresh();
        for rid in 0..4 {
            let row = RowID::new("t", rid);
            store
                .push_committed(row.clone(), RowVersion::committed(1, payload(rid)))
                .unwrap();
            store
                .push_committed(row.clone(), RowVersion::committed(2, payload(rid * 10)))
                .unwrap();
        }
        // Each row has 2 versions: one with end=2 (reclaimable
        // at high watermark), one with end=None (kept).
        assert_eq!(store.total_versions(), 8);

        let reclaimed = store.gc_all(u64::MAX);
        assert_eq!(reclaimed, 4);
        assert_eq!(store.total_versions(), 4);
        assert_eq!(store.tracked_rows(), 4);
    }

    /// GC must not reclaim versions visible to an active reader.
    /// A reader with `begin_ts = 5` can see versions where
    /// `begin <= 5 < end`, so versions with `end > 5` must be
    /// kept.
    #[test]
    fn gc_preserves_versions_visible_to_active_readers() {
        let (store, clock) = MvStore::fresh();
        let row = RowID::new("t", 1);
        store
            .push_committed(row.clone(), RowVersion::committed(1, payload(1)))
            .unwrap();
        store
            .push_committed(row.clone(), RowVersion::committed(10, payload(2)))
            .unwrap();
        // Open a reader at begin_ts = 5. (The clock is at 1 right
        // now from the registers above; bump it past 5 then take
        // a snapshot via observe.)
        clock.observe(4);
        let reader = store.active_registry().register(&clock); // begin_ts = 5
        assert_eq!(reader.begin_ts(), 5);
        assert_eq!(store.active_watermark(), 5);

        // The version with end = 10 is still visible to the
        // reader — must NOT be reclaimed.
        let reclaimed = store.gc_chain(&row, store.active_watermark());
        assert_eq!(reclaimed, 0);

        // Reader at begin_ts = 5 sees v1 (begin=1, end=10
        // satisfies 1 <= 5 < 10).
        assert_eq!(store.read(&row, 5), Some(payload(1)));

        // Once the reader closes, the watermark jumps to MAX and
        // v1 becomes reclaimable.
        drop(reader);
        let reclaimed = store.gc_chain(&row, store.active_watermark());
        assert_eq!(reclaimed, 1);
    }

    /// Many sequential commits to the same row with no active
    /// reader: chain length stays bounded under per-row GC.
    /// (`bounded` here = "1 latest version" — every prior version
    /// is reclaimable because nobody can see them.)
    #[test]
    fn gc_keeps_chain_bounded_under_repeated_updates() {
        let (store, _clock) = MvStore::fresh();
        let row = RowID::new("t", 1);
        for ts in 1..=100u64 {
            store
                .push_committed(row.clone(), RowVersion::committed(ts, payload(ts as i64)))
                .unwrap();
            // Per-update GC sweep at the current watermark
            // (u64::MAX since no readers).
            store.gc_chain(&row, store.active_watermark());
        }
        // Only the latest version (end = None) survives.
        assert_eq!(store.total_versions(), 1);
    }
}