Skip to main content

sochdb_core/
version_chain.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Unified MVCC Version Chain Interface
19//!
20//! This module defines the canonical interface for MVCC version chains across SochDB.
21//! Multiple implementations exist for different subsystems, but they share these traits.
22//!
23//! ## Implementations
24//!
25//! | Implementation | Location | Use Case |
26//! |---------------|----------|----------|
27//! | `VersionChain` | `sochdb_core::epoch_gc` | Epoch-based GC with VecDeque |
28//! | `VersionChain` | `sochdb_storage::mvcc_snapshot` | Snapshot-based visibility |
29//! | `VersionChain` | `sochdb_storage::version_store` | Generic key-value MVCC |
30//! | `VersionChain` | `sochdb_storage::durable_storage` | Binary-search optimized |
31//!
32//! ## Visibility Semantics
33//!
34//! All implementations follow these MVCC visibility rules:
35//!
36//! 1. **Read Committed**: A version is visible if its creating transaction has committed
37//!    before the reader's start timestamp.
38//!
39//! 2. **Snapshot Isolation**: A version is visible if:
40//!    - It was committed before the reader's snapshot timestamp
41//!    - It was not deleted, or deleted after the snapshot timestamp
42//!
43//! 3. **Serializable (SSI)**: Adds read-write conflict detection on top of SI.
44
45/// Transaction identifier type
46pub type TxnId = u64;
47
48/// Logical timestamp type
49pub type Timestamp = u64;
50
51/// Version visibility context
52///
53/// Provides the information needed to determine if a version is visible
54/// to a particular reader/transaction.
55#[derive(Debug, Clone)]
56pub struct VisibilityContext {
57    /// Reader's transaction ID
58    pub reader_txn_id: TxnId,
59    /// Reader's snapshot timestamp
60    pub snapshot_ts: Timestamp,
61    /// Set of transaction IDs that are still active (not committed)
62    pub active_txn_ids: std::collections::HashSet<TxnId>,
63}
64
65impl VisibilityContext {
66    /// Create a new visibility context
67    pub fn new(reader_txn_id: TxnId, snapshot_ts: Timestamp) -> Self {
68        Self {
69            reader_txn_id,
70            snapshot_ts,
71            active_txn_ids: std::collections::HashSet::new(),
72        }
73    }
74
75    /// Create with active transaction set
76    pub fn with_active_txns(
77        reader_txn_id: TxnId,
78        snapshot_ts: Timestamp,
79        active_txn_ids: std::collections::HashSet<TxnId>,
80    ) -> Self {
81        Self {
82            reader_txn_id,
83            snapshot_ts,
84            active_txn_ids,
85        }
86    }
87
88    /// Check if a transaction was committed before this snapshot
89    pub fn is_committed_before(&self, txn_id: TxnId, commit_ts: Option<Timestamp>) -> bool {
90        match commit_ts {
91            Some(ts) => ts < self.snapshot_ts && !self.active_txn_ids.contains(&txn_id),
92            None => false,
93        }
94    }
95}
96
97/// Version metadata
98///
99/// Common metadata for all version chain implementations.
100#[derive(Debug, Clone)]
101pub struct VersionMeta {
102    /// Transaction that created this version
103    pub created_by: TxnId,
104    /// Timestamp when this version was created
105    pub created_ts: Timestamp,
106    /// Transaction that deleted this version (0 = not deleted)
107    pub deleted_by: TxnId,
108    /// Timestamp when this version was deleted (0 = not deleted)
109    pub deleted_ts: Timestamp,
110    /// Commit timestamp (0 = not yet committed)
111    pub commit_ts: Timestamp,
112}
113
114impl VersionMeta {
115    /// Create metadata for a new uncommitted version
116    pub fn new_uncommitted(created_by: TxnId, created_ts: Timestamp) -> Self {
117        Self {
118            created_by,
119            created_ts,
120            deleted_by: 0,
121            deleted_ts: 0,
122            commit_ts: 0,
123        }
124    }
125
126    /// Check if this version is visible according to the context
127    pub fn is_visible(&self, ctx: &VisibilityContext) -> bool {
128        // Must be committed before snapshot
129        if self.commit_ts == 0 {
130            // Uncommitted - only visible to creating transaction
131            return self.created_by == ctx.reader_txn_id;
132        }
133
134        if self.commit_ts >= ctx.snapshot_ts {
135            return false;
136        }
137
138        // Must not be deleted, or deleted after snapshot
139        if self.deleted_by != 0 && self.deleted_ts < ctx.snapshot_ts {
140            return false;
141        }
142
143        true
144    }
145
146    /// Mark as committed
147    pub fn commit(&mut self, commit_ts: Timestamp) {
148        self.commit_ts = commit_ts;
149    }
150
151    /// Mark as deleted
152    pub fn delete(&mut self, deleted_by: TxnId, deleted_ts: Timestamp) {
153        self.deleted_by = deleted_by;
154        self.deleted_ts = deleted_ts;
155    }
156
157    /// Check if version is committed
158    pub fn is_committed(&self) -> bool {
159        self.commit_ts != 0
160    }
161
162    /// Check if version is deleted
163    pub fn is_deleted(&self) -> bool {
164        self.deleted_by != 0
165    }
166}
167
168/// Trait for MVCC version chain implementations
169///
170/// Implementors store multiple versions of a value and provide
171/// visibility-based access according to MVCC semantics.
172pub trait MvccVersionChain {
173    /// The value type stored in versions
174    type Value;
175
176    /// Get the visible version for the given context
177    fn get_visible(&self, ctx: &VisibilityContext) -> Option<&Self::Value>;
178
179    /// Get the latest version (regardless of visibility)
180    fn get_latest(&self) -> Option<&Self::Value>;
181
182    /// Number of versions in the chain
183    fn version_count(&self) -> usize;
184
185    /// Check if the chain is empty
186    fn is_empty(&self) -> bool {
187        self.version_count() == 0
188    }
189}
190
191/// Trait for mutable version chain operations
192pub trait MvccVersionChainMut: MvccVersionChain {
193    /// Add a new uncommitted version
194    fn add_uncommitted(&mut self, value: Self::Value, txn_id: TxnId);
195
196    /// Commit a version
197    fn commit_version(&mut self, txn_id: TxnId, commit_ts: Timestamp) -> bool;
198
199    /// Mark the latest visible version as deleted
200    fn delete_version(&mut self, txn_id: TxnId, delete_ts: Timestamp) -> bool;
201
202    /// Garbage collect versions older than the given timestamp
203    /// Returns (versions_removed, bytes_freed)
204    fn gc(&mut self, min_visible_ts: Timestamp) -> (usize, usize);
205}
206
207/// Trait for detecting write conflicts
208pub trait WriteConflictDetection {
209    /// Check if there's a write-write conflict with another transaction
210    fn has_write_conflict(&self, txn_id: TxnId) -> bool;
211}
212
213// =============================================================================
214// Rec 6: Compile-Time Concurrency Policy Markers
215// =============================================================================
216
217/// Marker trait for version chain concurrency strategy.
218///
219/// Implementors tag their version chain with the concurrency mechanism used,
220/// enabling generic code to select appropriate strategies at compile time.
221pub trait ConcurrencyPolicy: Send + Sync {
222    /// Human-readable name for diagnostics
223    const NAME: &'static str;
224}
225
226/// No internal synchronization — caller must hold external lock (DashMap shard, RwLock, etc.)
227pub struct ExternalLock;
228impl ConcurrencyPolicy for ExternalLock {
229    const NAME: &'static str = "ExternalLock";
230}
231
232/// Internal RwLock — chain can be shared across threads with &self methods
233pub struct InternalRwLock;
234impl ConcurrencyPolicy for InternalRwLock {
235    const NAME: &'static str = "InternalRwLock";
236}
237
238/// Lock-free atomics — CAS-based, fully concurrent &self methods
239pub struct LockFreeAtomic;
240impl ConcurrencyPolicy for LockFreeAtomic {
241    const NAME: &'static str = "LockFreeAtomic";
242}
243
244// Safety: marker types are stateless
245unsafe impl Send for ExternalLock {}
246unsafe impl Sync for ExternalLock {}
247unsafe impl Send for InternalRwLock {}
248unsafe impl Sync for InternalRwLock {}
249unsafe impl Send for LockFreeAtomic {}
250unsafe impl Sync for LockFreeAtomic {}
251
252// =============================================================================
253// Rec 11: Consolidated Binary-Search Version Chain
254// =============================================================================
255
256/// Trait for version entry types used in binary-search chains.
257///
258/// Implemented by `durable_storage::Version` and `mvcc_concurrent::VersionEntry`
259/// to allow a single `BinarySearchChain<E>` to handle both.
260pub trait ChainEntry: Sized + std::fmt::Debug {
261    /// Get the commit timestamp (0 = uncommitted)
262    fn commit_ts(&self) -> u64;
263    /// Get the transaction ID that created this version
264    fn txn_id(&self) -> u64;
265    /// Set the commit timestamp (called during commit)
266    fn set_commit_ts(&mut self, ts: u64);
267}
268
269/// A binary-search optimized version chain — the consolidated core.
270///
271/// Stores committed versions sorted descending by `commit_ts`, with at most
272/// one uncommitted version slot. Uses `partition_point` for O(log V) lookups.
273///
274/// This struct captures the duplicated logic previously in:
275/// - `durable_storage::VersionChain`
276/// - `mvcc_concurrent::VersionChain`
277///
278/// Both modules now wrap `BinarySearchChain<E>` and delegate the core
279/// binary-search / commit / abort / read / conflict-check operations to it.
280#[derive(Debug)]
281pub struct BinarySearchChain<E: ChainEntry> {
282    /// Committed versions sorted by commit_ts DESCENDING (newest first)
283    committed: Vec<E>,
284    /// Single uncommitted version slot (at most one per transaction writing this key)
285    uncommitted: Option<E>,
286}
287
288impl<E: ChainEntry> Default for BinarySearchChain<E> {
289    fn default() -> Self {
290        Self::new()
291    }
292}
293
294impl<E: ChainEntry> BinarySearchChain<E> {
295    /// Create a new empty chain.
296    #[inline]
297    pub fn new() -> Self {
298        Self {
299            committed: Vec::new(),
300            uncommitted: None,
301        }
302    }
303
304    // ---- Uncommitted slot management ----
305
306    /// Replace the uncommitted slot. Returns the previous entry if any.
307    #[inline]
308    pub fn set_uncommitted(&mut self, entry: E) -> Option<E> {
309        self.uncommitted.replace(entry)
310    }
311
312    /// Reference to the uncommitted entry.
313    #[inline]
314    pub fn uncommitted(&self) -> Option<&E> {
315        self.uncommitted.as_ref()
316    }
317
318    /// Mutable reference to the uncommitted entry.
319    #[inline]
320    pub fn uncommitted_mut(&mut self) -> Option<&mut E> {
321        self.uncommitted.as_mut()
322    }
323
324    // ---- Core operations ----
325
326    /// Commit the uncommitted version with the given `commit_ts`.
327    ///
328    /// Moves the entry from the uncommitted slot into the sorted committed
329    /// list at the correct position. Returns `true` on success.
330    ///
331    /// Complexity: O(log V) binary search + O(V) insert (amortised O(1) for newest).
332    pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
333        if let Some(ref mut v) = self.uncommitted {
334            if v.txn_id() == txn_id && v.commit_ts() == 0 {
335                v.set_commit_ts(commit_ts);
336                let committed_version = self.uncommitted.take().unwrap();
337                let insert_pos = self
338                    .committed
339                    .partition_point(|e| e.commit_ts() > commit_ts);
340                self.committed.insert(insert_pos, committed_version);
341                return true;
342            }
343        }
344        false
345    }
346
347    /// Abort a transaction — clear the uncommitted slot if it matches `txn_id`.
348    #[inline]
349    pub fn abort(&mut self, txn_id: u64) {
350        if let Some(ref v) = self.uncommitted {
351            if v.txn_id() == txn_id {
352                self.uncommitted = None;
353            }
354        }
355    }
356
357    /// Read the visible version at the given snapshot timestamp.
358    ///
359    /// If `current_txn_id` is provided and matches the uncommitted version's
360    /// transaction, the uncommitted version is returned (read-own-writes).
361    ///
362    /// Otherwise performs O(log V) binary search for the newest committed
363    /// version with `commit_ts < snapshot_ts`.
364    #[inline]
365    pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&E> {
366        if let Some(txn_id) = current_txn_id {
367            if let Some(ref v) = self.uncommitted {
368                if v.txn_id() == txn_id {
369                    return Some(v);
370                }
371            }
372        }
373        let idx = self
374            .committed
375            .partition_point(|v| v.commit_ts() >= snapshot_ts);
376        self.committed.get(idx)
377    }
378
379    /// Check if there's a write-write conflict with another transaction.
380    #[inline]
381    pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
382        if let Some(ref v) = self.uncommitted {
383            return v.txn_id() != my_txn_id;
384        }
385        false
386    }
387
388    /// GC versions older than `min_active_ts`.
389    ///
390    /// Retention must agree with [`read_at`](Self::read_at), whose visibility
391    /// rule is **strict**: a reader at snapshot `S` observes the newest version
392    /// with `commit_ts < S`. The smallest live snapshot is `min_active_ts`, so
393    /// the oldest version any reader can still need is the newest one with
394    /// `commit_ts < min_active_ts`. We therefore keep every version with
395    /// `commit_ts >= min_active_ts` **plus one anchor** — the newest version
396    /// strictly below `min_active_ts`.
397    ///
398    /// Using `>=` here (rather than `>`) is load-bearing for read-safety: with
399    /// `>`, a boundary version whose `commit_ts == min_active_ts` would be
400    /// chosen as the anchor and the genuinely-needed older version (the one a
401    /// reader at `snapshot == min_active_ts` resolves, since that boundary
402    /// version is *not* visible to it under the strict `<` rule) would be
403    /// freed — causing the reader to observe a spurious `None`.
404    pub fn gc_by_ts(&mut self, min_active_ts: u64) {
405        if self.committed.len() <= 1 {
406            return;
407        }
408        let split_idx = self
409            .committed
410            .partition_point(|v| v.commit_ts() >= min_active_ts);
411        let keep_count = if split_idx < self.committed.len() {
412            split_idx + 1
413        } else {
414            split_idx
415        };
416        self.committed.truncate(keep_count);
417    }
418
419    // ---- Accessors ----
420
421    /// Total version count (committed + uncommitted).
422    #[inline]
423    pub fn version_count(&self) -> usize {
424        self.committed.len() + usize::from(self.uncommitted.is_some())
425    }
426
427    /// Number of committed versions.
428    #[inline]
429    pub fn committed_count(&self) -> usize {
430        self.committed.len()
431    }
432
433    /// Check if chain is empty.
434    #[inline]
435    pub fn is_empty(&self) -> bool {
436        self.committed.is_empty() && self.uncommitted.is_none()
437    }
438
439    /// Slice of committed versions (newest first).
440    #[inline]
441    pub fn committed_versions(&self) -> &[E] {
442        &self.committed
443    }
444
445    /// Mutable access to committed versions (for custom GC).
446    #[inline]
447    pub fn committed_versions_mut(&mut self) -> &mut Vec<E> {
448        &mut self.committed
449    }
450
451    /// Latest version: uncommitted if present, else newest committed.
452    #[inline]
453    pub fn latest(&self) -> Option<&E> {
454        self.uncommitted.as_ref().or_else(|| self.committed.first())
455    }
456
457    /// Newest committed version only.
458    #[inline]
459    pub fn latest_committed(&self) -> Option<&E> {
460        self.committed.first()
461    }
462}
463
464// =============================================================================
465// Rec 11: Unified MVCC Store Trait
466// =============================================================================
467
468/// Error type for MVCC store operations.
469#[derive(Debug)]
470pub enum MvccStoreError {
471    /// Another uncommitted write exists on this key
472    WriteConflict,
473}
474
475impl std::fmt::Display for MvccStoreError {
476    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
477        match self {
478            Self::WriteConflict => write!(f, "write-write conflict"),
479        }
480    }
481}
482
483impl std::error::Error for MvccStoreError {}
484
485/// GC statistics returned by `MvccStore::mvcc_gc`.
486#[derive(Debug, Default, Clone)]
487pub struct MvccGcStats {
488    pub versions_removed: usize,
489    pub keys_scanned: usize,
490}
491
492/// Unified MVCC key-value store trait.
493///
494/// Provides a common interface for:
495/// - `durable_storage::MvccMemTable`
496/// - `mvcc_concurrent::VersionStore`
497/// - `epoch_mvcc::EpochMvccStore`
498///
499/// Callers can program against this trait to be agnostic to the
500/// underlying concurrency / storage strategy.
501pub trait MvccStore: Send + Sync {
502    /// Read the visible value at `snapshot_ts`, optionally seeing own writes
503    /// from `txn_id`.
504    fn mvcc_get(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>>;
505
506    /// Write a value (or tombstone `None`) as uncommitted for the given transaction.
507    fn mvcc_put(
508        &self,
509        key: &[u8],
510        value: Option<Vec<u8>>,
511        txn_id: u64,
512    ) -> Result<(), MvccStoreError>;
513
514    /// Commit one key's uncommitted write. Returns `true` if found and committed.
515    fn mvcc_commit_key(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool;
516
517    /// Abort one key's uncommitted write.
518    fn mvcc_abort_key(&self, key: &[u8], txn_id: u64);
519
520    /// Check if there's an uncommitted write conflict on a key.
521    fn mvcc_has_conflict(&self, key: &[u8], txn_id: u64) -> bool;
522
523    /// Run garbage collection. Returns statistics.
524    fn mvcc_gc(&self, min_ts: u64) -> MvccGcStats;
525
526    /// Number of distinct keys in the store.
527    fn mvcc_key_count(&self) -> usize;
528}
529
530#[cfg(test)]
531mod tests {
532    use super::*;
533
534    #[test]
535    fn test_version_meta_visibility() {
536        let mut meta = VersionMeta::new_uncommitted(1, 100);
537
538        // Uncommitted - only visible to creator
539        let ctx = VisibilityContext::new(1, 200);
540        assert!(meta.is_visible(&ctx));
541
542        let ctx2 = VisibilityContext::new(2, 200);
543        assert!(!meta.is_visible(&ctx2));
544
545        // After commit - visible to later snapshots
546        meta.commit(150);
547        assert!(meta.is_visible(&ctx2));
548
549        // Not visible to earlier snapshots
550        let ctx3 = VisibilityContext::new(3, 100);
551        assert!(!meta.is_visible(&ctx3));
552    }
553
554    #[test]
555    fn test_version_meta_deletion() {
556        let mut meta = VersionMeta::new_uncommitted(1, 100);
557        meta.commit(150);
558        meta.delete(2, 200);
559
560        // Visible before deletion
561        let ctx = VisibilityContext::new(3, 180);
562        assert!(meta.is_visible(&ctx));
563
564        // Not visible after deletion
565        let ctx2 = VisibilityContext::new(3, 250);
566        assert!(!meta.is_visible(&ctx2));
567    }
568
569    #[test]
570    fn test_visibility_context_committed_before() {
571        let mut active = std::collections::HashSet::new();
572        active.insert(5);
573
574        let ctx = VisibilityContext::with_active_txns(1, 200, active);
575
576        // Committed before snapshot
577        assert!(ctx.is_committed_before(2, Some(100)));
578
579        // Committed after snapshot
580        assert!(!ctx.is_committed_before(3, Some(250)));
581
582        // Active transaction - not committed
583        assert!(!ctx.is_committed_before(5, Some(100)));
584
585        // No commit timestamp
586        assert!(!ctx.is_committed_before(6, None));
587    }
588
589    #[test]
590    fn test_concurrency_policy_names() {
591        assert_eq!(ExternalLock::NAME, "ExternalLock");
592        assert_eq!(InternalRwLock::NAME, "InternalRwLock");
593        assert_eq!(LockFreeAtomic::NAME, "LockFreeAtomic");
594    }
595
596    // ---- BinarySearchChain tests (Rec 11) ----
597
598    /// Minimal entry type for testing the consolidated chain.
599    #[derive(Debug, Clone)]
600    struct TestEntry {
601        commit_ts: u64,
602        txn_id: u64,
603        val: i32,
604    }
605
606    impl ChainEntry for TestEntry {
607        fn commit_ts(&self) -> u64 {
608            self.commit_ts
609        }
610        fn txn_id(&self) -> u64 {
611            self.txn_id
612        }
613        fn set_commit_ts(&mut self, ts: u64) {
614            self.commit_ts = ts;
615        }
616    }
617
618    #[test]
619    fn test_binary_search_chain_commit_and_read() {
620        let mut chain = BinarySearchChain::<TestEntry>::new();
621        assert!(chain.is_empty());
622
623        // Add uncommitted, then commit
624        chain.set_uncommitted(TestEntry {
625            commit_ts: 0,
626            txn_id: 1,
627            val: 10,
628        });
629        assert_eq!(chain.version_count(), 1);
630
631        // Read own writes
632        let v = chain.read_at(100, Some(1)).unwrap();
633        assert_eq!(v.val, 10);
634
635        // Other txn can't see it
636        assert!(chain.read_at(100, Some(2)).is_none());
637
638        // Commit
639        assert!(chain.commit(1, 50));
640        assert_eq!(chain.committed_count(), 1);
641
642        // Now visible to snapshot_ts > 50
643        let v = chain.read_at(51, None).unwrap();
644        assert_eq!(v.val, 10);
645
646        // Not visible to snapshot_ts <= 50
647        assert!(chain.read_at(50, None).is_none());
648    }
649
650    #[test]
651    fn test_binary_search_chain_abort() {
652        let mut chain = BinarySearchChain::<TestEntry>::new();
653        chain.set_uncommitted(TestEntry {
654            commit_ts: 0,
655            txn_id: 1,
656            val: 10,
657        });
658        chain.abort(1);
659        assert!(chain.is_empty());
660        // Abort wrong txn is a no-op
661        chain.set_uncommitted(TestEntry {
662            commit_ts: 0,
663            txn_id: 2,
664            val: 20,
665        });
666        chain.abort(1);
667        assert_eq!(chain.version_count(), 1);
668    }
669
670    #[test]
671    fn test_binary_search_chain_write_conflict() {
672        let mut chain = BinarySearchChain::<TestEntry>::new();
673        assert!(!chain.has_write_conflict(1));
674
675        chain.set_uncommitted(TestEntry {
676            commit_ts: 0,
677            txn_id: 1,
678            val: 10,
679        });
680        assert!(!chain.has_write_conflict(1)); // own txn
681        assert!(chain.has_write_conflict(2)); // other txn
682    }
683
684    #[test]
685    fn test_binary_search_chain_gc() {
686        let mut chain = BinarySearchChain::<TestEntry>::new();
687
688        // Commit 5 versions at ts 10, 20, 30, 40, 50
689        for i in 1..=5u64 {
690            chain.set_uncommitted(TestEntry {
691                commit_ts: 0,
692                txn_id: i,
693                val: i as i32,
694            });
695            chain.commit(i, i * 10);
696        }
697        assert_eq!(chain.committed_count(), 5);
698
699        // GC with min_active_ts = 25 → keep ts > 25 (30, 40, 50) + 1 anchor (20)
700        chain.gc_by_ts(25);
701        assert_eq!(chain.committed_count(), 4); // 50, 40, 30, 20
702
703        // GC with min_active_ts = 45 → keep ts > 45 (50) + 1 anchor (40)
704        chain.gc_by_ts(45);
705        assert_eq!(chain.committed_count(), 2); // 50, 40
706    }
707
708    #[test]
709    fn test_binary_search_chain_multiple_versions() {
710        let mut chain = BinarySearchChain::<TestEntry>::new();
711
712        // Commit in order: ts=100, ts=200, ts=300
713        for (i, ts) in [100u64, 200, 300].iter().enumerate() {
714            let txn = (i + 1) as u64;
715            chain.set_uncommitted(TestEntry {
716                commit_ts: 0,
717                txn_id: txn,
718                val: *ts as i32,
719            });
720            chain.commit(txn, *ts);
721        }
722
723        // Snapshot at 150 → sees ts=100
724        assert_eq!(chain.read_at(150, None).unwrap().val, 100);
725        // Snapshot at 250 → sees ts=200
726        assert_eq!(chain.read_at(250, None).unwrap().val, 200);
727        // Snapshot at 350 → sees ts=300
728        assert_eq!(chain.read_at(350, None).unwrap().val, 300);
729        // Snapshot at 50 → sees nothing
730        assert!(chain.read_at(50, None).is_none());
731    }
732}
733
734#[cfg(test)]
735mod version_chain_properties {
736    use super::*;
737    use proptest::prelude::*;
738
739    /// Minimal committed entry for property testing the chain's read/GC contract.
740    #[derive(Debug, Clone)]
741    struct PropEntry {
742        commit_ts: u64,
743        val: i32,
744    }
745
746    impl ChainEntry for PropEntry {
747        fn commit_ts(&self) -> u64 {
748            self.commit_ts
749        }
750        fn txn_id(&self) -> u64 {
751            0
752        }
753        fn set_commit_ts(&mut self, ts: u64) {
754            self.commit_ts = ts;
755        }
756    }
757
758    /// Build a chain from a set of distinct commit timestamps (committed in
759    /// ascending order, mirroring how `MvccManager` allocates commit_ts).
760    fn build_chain(mut tss: Vec<u64>) -> BinarySearchChain<PropEntry> {
761        tss.sort_unstable();
762        tss.dedup();
763        let mut chain = BinarySearchChain::<PropEntry>::new();
764        for ts in tss {
765            chain.set_uncommitted(PropEntry {
766                commit_ts: 0,
767                val: ts as i32,
768            });
769            // txn_id 0 is fine here: one writer at a time in this model.
770            chain.commit(0, ts);
771        }
772        chain
773    }
774
775    /// Reference visibility oracle: the value a reader at `snapshot` must see is
776    /// the largest commit_ts strictly less than `snapshot` (strict `<` rule).
777    fn expected_visible(tss: &[u64], snapshot: u64) -> Option<i32> {
778        tss.iter()
779            .copied()
780            .filter(|&ts| ts < snapshot)
781            .max()
782            .map(|ts| ts as i32)
783    }
784
785    proptest! {
786        /// `read_at` matches the strict-visibility oracle for any snapshot.
787        #[test]
788        fn prop_read_at_matches_strict_visibility(
789            tss in prop::collection::vec(1u64..1000, 0..32),
790            snapshot in 0u64..1100,
791        ) {
792            let mut sorted = tss.clone();
793            sorted.sort_unstable();
794            sorted.dedup();
795            let chain = build_chain(tss);
796
797            let got = chain.read_at(snapshot, None).map(|e| e.val);
798            prop_assert_eq!(got, expected_visible(&sorted, snapshot));
799        }
800
801        /// GC is read-safe: after `gc_by_ts(min_active_ts)`, every reader whose
802        /// snapshot is `>= min_active_ts` still observes exactly the same value
803        /// as before GC. This is the invariant the T8 anchor fix (`>=`) restores —
804        /// a too-aggressive GC would make such a reader see a spurious `None`.
805        #[test]
806        fn prop_gc_preserves_reads_for_live_snapshots(
807            tss in prop::collection::vec(1u64..1000, 1..32),
808            min_active in 1u64..1000,
809            // Several snapshots at or above the GC watermark.
810            extra in prop::collection::vec(0u64..200, 1..6),
811        ) {
812            let mut sorted = tss.clone();
813            sorted.sort_unstable();
814            sorted.dedup();
815
816            let mut chain = build_chain(tss);
817
818            // Snapshots that remain valid after GC are those >= min_active_ts.
819            let snapshots: Vec<u64> = extra.iter().map(|d| min_active + *d).collect();
820
821            // Capture pre-GC observations.
822            let before: Vec<Option<i32>> = snapshots
823                .iter()
824                .map(|&s| chain.read_at(s, None).map(|e| e.val))
825                .collect();
826
827            chain.gc_by_ts(min_active);
828
829            // Post-GC observations must be identical for every live snapshot.
830            for (i, &s) in snapshots.iter().enumerate() {
831                let after = chain.read_at(s, None).map(|e| e.val);
832                prop_assert_eq!(after, before[i],
833                    "GC at {} changed read at snapshot {}", min_active, s);
834                // And it must still equal the oracle.
835                prop_assert_eq!(after, expected_visible(&sorted, s));
836            }
837        }
838    }
839}