sochdb_core/version_chain.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Unified MVCC Version Chain Interface
19//!
20//! This module defines the canonical interface for MVCC version chains across SochDB.
21//! Multiple implementations exist for different subsystems, but they share these traits.
22//!
23//! ## Implementations
24//!
25//! | Implementation | Location | Use Case |
26//! |---------------|----------|----------|
27//! | `VersionChain` | `sochdb_core::epoch_gc` | Epoch-based GC with VecDeque |
28//! | `VersionChain` | `sochdb_storage::mvcc_snapshot` | Snapshot-based visibility |
29//! | `VersionChain` | `sochdb_storage::version_store` | Generic key-value MVCC |
30//! | `VersionChain` | `sochdb_storage::durable_storage` | Binary-search optimized |
31//!
32//! ## Visibility Semantics
33//!
34//! All implementations follow these MVCC visibility rules:
35//!
36//! 1. **Read Committed**: A version is visible if its creating transaction has committed
37//! before the reader's start timestamp.
38//!
39//! 2. **Snapshot Isolation**: A version is visible if:
40//! - It was committed before the reader's snapshot timestamp
41//! - It was not deleted, or deleted after the snapshot timestamp
42//!
43//! 3. **Serializable (SSI)**: Adds read-write conflict detection on top of SI.
44
45/// Transaction identifier type
46pub type TxnId = u64;
47
48/// Logical timestamp type
49pub type Timestamp = u64;
50
51/// Version visibility context
52///
53/// Provides the information needed to determine if a version is visible
54/// to a particular reader/transaction.
55#[derive(Debug, Clone)]
56pub struct VisibilityContext {
57 /// Reader's transaction ID
58 pub reader_txn_id: TxnId,
59 /// Reader's snapshot timestamp
60 pub snapshot_ts: Timestamp,
61 /// Set of transaction IDs that are still active (not committed)
62 pub active_txn_ids: std::collections::HashSet<TxnId>,
63}
64
65impl VisibilityContext {
66 /// Create a new visibility context
67 pub fn new(reader_txn_id: TxnId, snapshot_ts: Timestamp) -> Self {
68 Self {
69 reader_txn_id,
70 snapshot_ts,
71 active_txn_ids: std::collections::HashSet::new(),
72 }
73 }
74
75 /// Create with active transaction set
76 pub fn with_active_txns(
77 reader_txn_id: TxnId,
78 snapshot_ts: Timestamp,
79 active_txn_ids: std::collections::HashSet<TxnId>,
80 ) -> Self {
81 Self {
82 reader_txn_id,
83 snapshot_ts,
84 active_txn_ids,
85 }
86 }
87
88 /// Check if a transaction was committed before this snapshot
89 pub fn is_committed_before(&self, txn_id: TxnId, commit_ts: Option<Timestamp>) -> bool {
90 match commit_ts {
91 Some(ts) => ts < self.snapshot_ts && !self.active_txn_ids.contains(&txn_id),
92 None => false,
93 }
94 }
95}
96
97/// Version metadata
98///
99/// Common metadata for all version chain implementations.
100#[derive(Debug, Clone)]
101pub struct VersionMeta {
102 /// Transaction that created this version
103 pub created_by: TxnId,
104 /// Timestamp when this version was created
105 pub created_ts: Timestamp,
106 /// Transaction that deleted this version (0 = not deleted)
107 pub deleted_by: TxnId,
108 /// Timestamp when this version was deleted (0 = not deleted)
109 pub deleted_ts: Timestamp,
110 /// Commit timestamp (0 = not yet committed)
111 pub commit_ts: Timestamp,
112}
113
114impl VersionMeta {
115 /// Create metadata for a new uncommitted version
116 pub fn new_uncommitted(created_by: TxnId, created_ts: Timestamp) -> Self {
117 Self {
118 created_by,
119 created_ts,
120 deleted_by: 0,
121 deleted_ts: 0,
122 commit_ts: 0,
123 }
124 }
125
126 /// Check if this version is visible according to the context
127 pub fn is_visible(&self, ctx: &VisibilityContext) -> bool {
128 // Must be committed before snapshot
129 if self.commit_ts == 0 {
130 // Uncommitted - only visible to creating transaction
131 return self.created_by == ctx.reader_txn_id;
132 }
133
134 if self.commit_ts >= ctx.snapshot_ts {
135 return false;
136 }
137
138 // Must not be deleted, or deleted after snapshot
139 if self.deleted_by != 0 && self.deleted_ts < ctx.snapshot_ts {
140 return false;
141 }
142
143 true
144 }
145
146 /// Mark as committed
147 pub fn commit(&mut self, commit_ts: Timestamp) {
148 self.commit_ts = commit_ts;
149 }
150
151 /// Mark as deleted
152 pub fn delete(&mut self, deleted_by: TxnId, deleted_ts: Timestamp) {
153 self.deleted_by = deleted_by;
154 self.deleted_ts = deleted_ts;
155 }
156
157 /// Check if version is committed
158 pub fn is_committed(&self) -> bool {
159 self.commit_ts != 0
160 }
161
162 /// Check if version is deleted
163 pub fn is_deleted(&self) -> bool {
164 self.deleted_by != 0
165 }
166}
167
168/// Trait for MVCC version chain implementations
169///
170/// Implementors store multiple versions of a value and provide
171/// visibility-based access according to MVCC semantics.
172pub trait MvccVersionChain {
173 /// The value type stored in versions
174 type Value;
175
176 /// Get the visible version for the given context
177 fn get_visible(&self, ctx: &VisibilityContext) -> Option<&Self::Value>;
178
179 /// Get the latest version (regardless of visibility)
180 fn get_latest(&self) -> Option<&Self::Value>;
181
182 /// Number of versions in the chain
183 fn version_count(&self) -> usize;
184
185 /// Check if the chain is empty
186 fn is_empty(&self) -> bool {
187 self.version_count() == 0
188 }
189}
190
191/// Trait for mutable version chain operations
192pub trait MvccVersionChainMut: MvccVersionChain {
193 /// Add a new uncommitted version
194 fn add_uncommitted(&mut self, value: Self::Value, txn_id: TxnId);
195
196 /// Commit a version
197 fn commit_version(&mut self, txn_id: TxnId, commit_ts: Timestamp) -> bool;
198
199 /// Mark the latest visible version as deleted
200 fn delete_version(&mut self, txn_id: TxnId, delete_ts: Timestamp) -> bool;
201
202 /// Garbage collect versions older than the given timestamp
203 /// Returns (versions_removed, bytes_freed)
204 fn gc(&mut self, min_visible_ts: Timestamp) -> (usize, usize);
205}
206
207/// Trait for detecting write conflicts
208pub trait WriteConflictDetection {
209 /// Check if there's a write-write conflict with another transaction
210 fn has_write_conflict(&self, txn_id: TxnId) -> bool;
211}
212
213// =============================================================================
214// Rec 6: Compile-Time Concurrency Policy Markers
215// =============================================================================
216
217/// Marker trait for version chain concurrency strategy.
218///
219/// Implementors tag their version chain with the concurrency mechanism used,
220/// enabling generic code to select appropriate strategies at compile time.
221pub trait ConcurrencyPolicy: Send + Sync {
222 /// Human-readable name for diagnostics
223 const NAME: &'static str;
224}
225
226/// No internal synchronization — caller must hold external lock (DashMap shard, RwLock, etc.)
227pub struct ExternalLock;
228impl ConcurrencyPolicy for ExternalLock {
229 const NAME: &'static str = "ExternalLock";
230}
231
232/// Internal RwLock — chain can be shared across threads with &self methods
233pub struct InternalRwLock;
234impl ConcurrencyPolicy for InternalRwLock {
235 const NAME: &'static str = "InternalRwLock";
236}
237
238/// Lock-free atomics — CAS-based, fully concurrent &self methods
239pub struct LockFreeAtomic;
240impl ConcurrencyPolicy for LockFreeAtomic {
241 const NAME: &'static str = "LockFreeAtomic";
242}
243
244// Safety: marker types are stateless
245unsafe impl Send for ExternalLock {}
246unsafe impl Sync for ExternalLock {}
247unsafe impl Send for InternalRwLock {}
248unsafe impl Sync for InternalRwLock {}
249unsafe impl Send for LockFreeAtomic {}
250unsafe impl Sync for LockFreeAtomic {}
251
252// =============================================================================
253// Rec 11: Consolidated Binary-Search Version Chain
254// =============================================================================
255
256/// Trait for version entry types used in binary-search chains.
257///
258/// Implemented by `durable_storage::Version` and `mvcc_concurrent::VersionEntry`
259/// to allow a single `BinarySearchChain<E>` to handle both.
260pub trait ChainEntry: Sized + std::fmt::Debug {
261 /// Get the commit timestamp (0 = uncommitted)
262 fn commit_ts(&self) -> u64;
263 /// Get the transaction ID that created this version
264 fn txn_id(&self) -> u64;
265 /// Set the commit timestamp (called during commit)
266 fn set_commit_ts(&mut self, ts: u64);
267}
268
269/// A binary-search optimized version chain — the consolidated core.
270///
271/// Stores committed versions sorted descending by `commit_ts`, with at most
272/// one uncommitted version slot. Uses `partition_point` for O(log V) lookups.
273///
274/// This struct captures the duplicated logic previously in:
275/// - `durable_storage::VersionChain`
276/// - `mvcc_concurrent::VersionChain`
277///
278/// Both modules now wrap `BinarySearchChain<E>` and delegate the core
279/// binary-search / commit / abort / read / conflict-check operations to it.
280#[derive(Debug)]
281pub struct BinarySearchChain<E: ChainEntry> {
282 /// Committed versions sorted by commit_ts DESCENDING (newest first)
283 committed: Vec<E>,
284 /// Single uncommitted version slot (at most one per transaction writing this key)
285 uncommitted: Option<E>,
286}
287
288impl<E: ChainEntry> Default for BinarySearchChain<E> {
289 fn default() -> Self {
290 Self::new()
291 }
292}
293
294impl<E: ChainEntry> BinarySearchChain<E> {
295 /// Create a new empty chain.
296 #[inline]
297 pub fn new() -> Self {
298 Self {
299 committed: Vec::new(),
300 uncommitted: None,
301 }
302 }
303
304 // ---- Uncommitted slot management ----
305
306 /// Replace the uncommitted slot. Returns the previous entry if any.
307 #[inline]
308 pub fn set_uncommitted(&mut self, entry: E) -> Option<E> {
309 self.uncommitted.replace(entry)
310 }
311
312 /// Reference to the uncommitted entry.
313 #[inline]
314 pub fn uncommitted(&self) -> Option<&E> {
315 self.uncommitted.as_ref()
316 }
317
318 /// Mutable reference to the uncommitted entry.
319 #[inline]
320 pub fn uncommitted_mut(&mut self) -> Option<&mut E> {
321 self.uncommitted.as_mut()
322 }
323
324 // ---- Core operations ----
325
326 /// Commit the uncommitted version with the given `commit_ts`.
327 ///
328 /// Moves the entry from the uncommitted slot into the sorted committed
329 /// list at the correct position. Returns `true` on success.
330 ///
331 /// Complexity: O(log V) binary search + O(V) insert (amortised O(1) for newest).
332 pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
333 if let Some(ref mut v) = self.uncommitted {
334 if v.txn_id() == txn_id && v.commit_ts() == 0 {
335 v.set_commit_ts(commit_ts);
336 let committed_version = self.uncommitted.take().unwrap();
337 let insert_pos = self
338 .committed
339 .partition_point(|e| e.commit_ts() > commit_ts);
340 self.committed.insert(insert_pos, committed_version);
341 return true;
342 }
343 }
344 false
345 }
346
347 /// Abort a transaction — clear the uncommitted slot if it matches `txn_id`.
348 #[inline]
349 pub fn abort(&mut self, txn_id: u64) {
350 if let Some(ref v) = self.uncommitted {
351 if v.txn_id() == txn_id {
352 self.uncommitted = None;
353 }
354 }
355 }
356
357 /// Read the visible version at the given snapshot timestamp.
358 ///
359 /// If `current_txn_id` is provided and matches the uncommitted version's
360 /// transaction, the uncommitted version is returned (read-own-writes).
361 ///
362 /// Otherwise performs O(log V) binary search for the newest committed
363 /// version with `commit_ts < snapshot_ts`.
364 #[inline]
365 pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&E> {
366 if let Some(txn_id) = current_txn_id {
367 if let Some(ref v) = self.uncommitted {
368 if v.txn_id() == txn_id {
369 return Some(v);
370 }
371 }
372 }
373 let idx = self
374 .committed
375 .partition_point(|v| v.commit_ts() >= snapshot_ts);
376 self.committed.get(idx)
377 }
378
379 /// Check if there's a write-write conflict with another transaction.
380 #[inline]
381 pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
382 if let Some(ref v) = self.uncommitted {
383 return v.txn_id() != my_txn_id;
384 }
385 false
386 }
387
388 /// GC versions older than `min_active_ts`.
389 ///
390 /// Keeps all versions with `commit_ts > min_active_ts` plus one anchor
391 /// version at or below the threshold (for new snapshots).
392 pub fn gc_by_ts(&mut self, min_active_ts: u64) {
393 if self.committed.len() <= 1 {
394 return;
395 }
396 let split_idx = self
397 .committed
398 .partition_point(|v| v.commit_ts() > min_active_ts);
399 let keep_count = if split_idx < self.committed.len() {
400 split_idx + 1
401 } else {
402 split_idx
403 };
404 self.committed.truncate(keep_count);
405 }
406
407 // ---- Accessors ----
408
409 /// Total version count (committed + uncommitted).
410 #[inline]
411 pub fn version_count(&self) -> usize {
412 self.committed.len() + usize::from(self.uncommitted.is_some())
413 }
414
415 /// Number of committed versions.
416 #[inline]
417 pub fn committed_count(&self) -> usize {
418 self.committed.len()
419 }
420
421 /// Check if chain is empty.
422 #[inline]
423 pub fn is_empty(&self) -> bool {
424 self.committed.is_empty() && self.uncommitted.is_none()
425 }
426
427 /// Slice of committed versions (newest first).
428 #[inline]
429 pub fn committed_versions(&self) -> &[E] {
430 &self.committed
431 }
432
433 /// Mutable access to committed versions (for custom GC).
434 #[inline]
435 pub fn committed_versions_mut(&mut self) -> &mut Vec<E> {
436 &mut self.committed
437 }
438
439 /// Latest version: uncommitted if present, else newest committed.
440 #[inline]
441 pub fn latest(&self) -> Option<&E> {
442 self.uncommitted.as_ref().or_else(|| self.committed.first())
443 }
444
445 /// Newest committed version only.
446 #[inline]
447 pub fn latest_committed(&self) -> Option<&E> {
448 self.committed.first()
449 }
450}
451
452// =============================================================================
453// Rec 11: Unified MVCC Store Trait
454// =============================================================================
455
456/// Error type for MVCC store operations.
457#[derive(Debug)]
458pub enum MvccStoreError {
459 /// Another uncommitted write exists on this key
460 WriteConflict,
461}
462
463impl std::fmt::Display for MvccStoreError {
464 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
465 match self {
466 Self::WriteConflict => write!(f, "write-write conflict"),
467 }
468 }
469}
470
471impl std::error::Error for MvccStoreError {}
472
473/// GC statistics returned by `MvccStore::mvcc_gc`.
474#[derive(Debug, Default, Clone)]
475pub struct MvccGcStats {
476 pub versions_removed: usize,
477 pub keys_scanned: usize,
478}
479
480/// Unified MVCC key-value store trait.
481///
482/// Provides a common interface for:
483/// - `durable_storage::MvccMemTable`
484/// - `mvcc_concurrent::VersionStore`
485/// - `epoch_mvcc::EpochMvccStore`
486///
487/// Callers can program against this trait to be agnostic to the
488/// underlying concurrency / storage strategy.
489pub trait MvccStore: Send + Sync {
490 /// Read the visible value at `snapshot_ts`, optionally seeing own writes
491 /// from `txn_id`.
492 fn mvcc_get(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>>;
493
494 /// Write a value (or tombstone `None`) as uncommitted for the given transaction.
495 fn mvcc_put(
496 &self,
497 key: &[u8],
498 value: Option<Vec<u8>>,
499 txn_id: u64,
500 ) -> Result<(), MvccStoreError>;
501
502 /// Commit one key's uncommitted write. Returns `true` if found and committed.
503 fn mvcc_commit_key(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool;
504
505 /// Abort one key's uncommitted write.
506 fn mvcc_abort_key(&self, key: &[u8], txn_id: u64);
507
508 /// Check if there's an uncommitted write conflict on a key.
509 fn mvcc_has_conflict(&self, key: &[u8], txn_id: u64) -> bool;
510
511 /// Run garbage collection. Returns statistics.
512 fn mvcc_gc(&self, min_ts: u64) -> MvccGcStats;
513
514 /// Number of distinct keys in the store.
515 fn mvcc_key_count(&self) -> usize;
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521
522 #[test]
523 fn test_version_meta_visibility() {
524 let mut meta = VersionMeta::new_uncommitted(1, 100);
525
526 // Uncommitted - only visible to creator
527 let ctx = VisibilityContext::new(1, 200);
528 assert!(meta.is_visible(&ctx));
529
530 let ctx2 = VisibilityContext::new(2, 200);
531 assert!(!meta.is_visible(&ctx2));
532
533 // After commit - visible to later snapshots
534 meta.commit(150);
535 assert!(meta.is_visible(&ctx2));
536
537 // Not visible to earlier snapshots
538 let ctx3 = VisibilityContext::new(3, 100);
539 assert!(!meta.is_visible(&ctx3));
540 }
541
542 #[test]
543 fn test_version_meta_deletion() {
544 let mut meta = VersionMeta::new_uncommitted(1, 100);
545 meta.commit(150);
546 meta.delete(2, 200);
547
548 // Visible before deletion
549 let ctx = VisibilityContext::new(3, 180);
550 assert!(meta.is_visible(&ctx));
551
552 // Not visible after deletion
553 let ctx2 = VisibilityContext::new(3, 250);
554 assert!(!meta.is_visible(&ctx2));
555 }
556
557 #[test]
558 fn test_visibility_context_committed_before() {
559 let mut active = std::collections::HashSet::new();
560 active.insert(5);
561
562 let ctx = VisibilityContext::with_active_txns(1, 200, active);
563
564 // Committed before snapshot
565 assert!(ctx.is_committed_before(2, Some(100)));
566
567 // Committed after snapshot
568 assert!(!ctx.is_committed_before(3, Some(250)));
569
570 // Active transaction - not committed
571 assert!(!ctx.is_committed_before(5, Some(100)));
572
573 // No commit timestamp
574 assert!(!ctx.is_committed_before(6, None));
575 }
576
577 #[test]
578 fn test_concurrency_policy_names() {
579 assert_eq!(ExternalLock::NAME, "ExternalLock");
580 assert_eq!(InternalRwLock::NAME, "InternalRwLock");
581 assert_eq!(LockFreeAtomic::NAME, "LockFreeAtomic");
582 }
583
584 // ---- BinarySearchChain tests (Rec 11) ----
585
586 /// Minimal entry type for testing the consolidated chain.
587 #[derive(Debug, Clone)]
588 struct TestEntry {
589 commit_ts: u64,
590 txn_id: u64,
591 val: i32,
592 }
593
594 impl ChainEntry for TestEntry {
595 fn commit_ts(&self) -> u64 { self.commit_ts }
596 fn txn_id(&self) -> u64 { self.txn_id }
597 fn set_commit_ts(&mut self, ts: u64) { self.commit_ts = ts; }
598 }
599
600 #[test]
601 fn test_binary_search_chain_commit_and_read() {
602 let mut chain = BinarySearchChain::<TestEntry>::new();
603 assert!(chain.is_empty());
604
605 // Add uncommitted, then commit
606 chain.set_uncommitted(TestEntry { commit_ts: 0, txn_id: 1, val: 10 });
607 assert_eq!(chain.version_count(), 1);
608
609 // Read own writes
610 let v = chain.read_at(100, Some(1)).unwrap();
611 assert_eq!(v.val, 10);
612
613 // Other txn can't see it
614 assert!(chain.read_at(100, Some(2)).is_none());
615
616 // Commit
617 assert!(chain.commit(1, 50));
618 assert_eq!(chain.committed_count(), 1);
619
620 // Now visible to snapshot_ts > 50
621 let v = chain.read_at(51, None).unwrap();
622 assert_eq!(v.val, 10);
623
624 // Not visible to snapshot_ts <= 50
625 assert!(chain.read_at(50, None).is_none());
626 }
627
628 #[test]
629 fn test_binary_search_chain_abort() {
630 let mut chain = BinarySearchChain::<TestEntry>::new();
631 chain.set_uncommitted(TestEntry { commit_ts: 0, txn_id: 1, val: 10 });
632 chain.abort(1);
633 assert!(chain.is_empty());
634 // Abort wrong txn is a no-op
635 chain.set_uncommitted(TestEntry { commit_ts: 0, txn_id: 2, val: 20 });
636 chain.abort(1);
637 assert_eq!(chain.version_count(), 1);
638 }
639
640 #[test]
641 fn test_binary_search_chain_write_conflict() {
642 let mut chain = BinarySearchChain::<TestEntry>::new();
643 assert!(!chain.has_write_conflict(1));
644
645 chain.set_uncommitted(TestEntry { commit_ts: 0, txn_id: 1, val: 10 });
646 assert!(!chain.has_write_conflict(1)); // own txn
647 assert!(chain.has_write_conflict(2)); // other txn
648 }
649
650 #[test]
651 fn test_binary_search_chain_gc() {
652 let mut chain = BinarySearchChain::<TestEntry>::new();
653
654 // Commit 5 versions at ts 10, 20, 30, 40, 50
655 for i in 1..=5u64 {
656 chain.set_uncommitted(TestEntry { commit_ts: 0, txn_id: i, val: i as i32 });
657 chain.commit(i, i * 10);
658 }
659 assert_eq!(chain.committed_count(), 5);
660
661 // GC with min_active_ts = 25 → keep ts > 25 (30, 40, 50) + 1 anchor (20)
662 chain.gc_by_ts(25);
663 assert_eq!(chain.committed_count(), 4); // 50, 40, 30, 20
664
665 // GC with min_active_ts = 45 → keep ts > 45 (50) + 1 anchor (40)
666 chain.gc_by_ts(45);
667 assert_eq!(chain.committed_count(), 2); // 50, 40
668 }
669
670 #[test]
671 fn test_binary_search_chain_multiple_versions() {
672 let mut chain = BinarySearchChain::<TestEntry>::new();
673
674 // Commit in order: ts=100, ts=200, ts=300
675 for (i, ts) in [100u64, 200, 300].iter().enumerate() {
676 let txn = (i + 1) as u64;
677 chain.set_uncommitted(TestEntry { commit_ts: 0, txn_id: txn, val: *ts as i32 });
678 chain.commit(txn, *ts);
679 }
680
681 // Snapshot at 150 → sees ts=100
682 assert_eq!(chain.read_at(150, None).unwrap().val, 100);
683 // Snapshot at 250 → sees ts=200
684 assert_eq!(chain.read_at(250, None).unwrap().val, 200);
685 // Snapshot at 350 → sees ts=300
686 assert_eq!(chain.read_at(350, None).unwrap().val, 300);
687 // Snapshot at 50 → sees nothing
688 assert!(chain.read_at(50, None).is_none());
689 }
690}