Skip to main content

sochdb_storage/
mvcc_snapshot.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! MVCC Snapshot Plumbing (Task 3)
19//!
20//! Multi-Version Concurrency Control for snapshot isolation:
21//! - Readers don't block writers
22//! - Writers don't block readers
23//! - Consistent point-in-time snapshots
24//!
25//! ## Version Visibility Rules
26//!
27//! A version is visible to transaction T if:
28//! 1. xmin (creating txn) committed before T started
29//! 2. xmax (deleting txn) is either:
30//!    - Not set (version still active), OR
31//!    - Aborted, OR
32//!    - Committed after T started
33//!
34//! ```text
35//! Version: [xmin=10, xmax=20, data]
36//!
37//! Transaction T (start_ts=15):
38//!   - xmin=10 < 15 ✓ (created before T)
39//!   - xmax=20 > 15 ✓ (deleted after T started, so still visible)
40//!   → VISIBLE
41//!
42//! Transaction T' (start_ts=25):
43//!   - xmin=10 < 25 ✓
44//!   - xmax=20 < 25 ✗ (already deleted)
45//!   → NOT VISIBLE
46//! ```
47//!
48//! ## Snapshot Types
49//!
50//! - **Read Snapshot**: Fixed point-in-time view
51//! - **Serializable Snapshot**: Tracks read/write sets for conflict detection
52//!
53//! ## Lock-Free MVCC (Task 2 Enhancement)
54//!
55//! The `LockFreeMvccManager` uses epoch-based reclamation for:
56//! - Wait-free reads during visibility checks
57//! - Lock-free garbage collection
58//! - Reduced contention under high concurrency
59
60use crossbeam_epoch::{self as epoch, Atomic, Owned};
61use parking_lot::RwLock;
62use std::collections::{BTreeMap, HashMap, HashSet};
63use std::sync::Arc;
64use std::sync::atomic::{AtomicU64, Ordering};
65
66/// Global transaction ID type
67pub type TxnId = u64;
68
69/// Timestamp type (logical, not wall-clock)
70pub type Timestamp = u64;
71
72/// Transaction status
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum TxnStatus {
75    /// Transaction in progress
76    Active,
77    /// Transaction committed with commit timestamp
78    Committed(Timestamp),
79    /// Transaction aborted
80    Aborted,
81}
82
83/// Version metadata for a single row version
84#[derive(Debug, Clone)]
85pub struct VersionInfo {
86    /// Transaction that created this version
87    pub xmin: TxnId,
88    /// Transaction that deleted this version (or 0 if active)
89    pub xmax: TxnId,
90    /// Creation timestamp
91    pub created_ts: Timestamp,
92    /// Deletion timestamp (or MAX if active)
93    pub deleted_ts: Timestamp,
94}
95
96impl VersionInfo {
97    /// Create a new active version
98    pub fn new(xmin: TxnId, created_ts: Timestamp) -> Self {
99        Self {
100            xmin,
101            xmax: 0,
102            created_ts,
103            deleted_ts: Timestamp::MAX,
104        }
105    }
106
107    /// Mark version as deleted
108    pub fn delete(&mut self, xmax: TxnId, deleted_ts: Timestamp) {
109        self.xmax = xmax;
110        self.deleted_ts = deleted_ts;
111    }
112
113    /// Check if version is visible to a snapshot
114    #[allow(deprecated)]
115    pub fn is_visible(&self, snapshot: &Snapshot, txn_manager: &TransactionManager) -> bool {
116        // Check xmin visibility
117        if self.xmin == snapshot.txn_id {
118            // Created by current transaction - visible
119            if self.xmax == snapshot.txn_id {
120                // Also deleted by current transaction - not visible
121                return false;
122            }
123            return true;
124        }
125
126        // xmin must be committed before snapshot
127        match txn_manager.get_status(self.xmin) {
128            Some(TxnStatus::Committed(commit_ts)) => {
129                if commit_ts >= snapshot.start_ts {
130                    return false; // Created after snapshot
131                }
132            }
133            Some(TxnStatus::Active) => {
134                // Created by an in-progress transaction - check if in snapshot's active set
135                if snapshot.active_txns.contains(&self.xmin) {
136                    return false; // Not yet committed when snapshot was taken
137                }
138                return false; // Still not committed
139            }
140            Some(TxnStatus::Aborted) | None => {
141                return false; // Created by aborted transaction
142            }
143        }
144
145        // Check xmax visibility
146        if self.xmax == 0 {
147            return true; // Not deleted
148        }
149
150        if self.xmax == snapshot.txn_id {
151            return false; // Deleted by current transaction
152        }
153
154        // xmax must NOT be committed before snapshot
155        match txn_manager.get_status(self.xmax) {
156            Some(TxnStatus::Committed(commit_ts)) => {
157                if commit_ts < snapshot.start_ts {
158                    return false; // Deleted before snapshot
159                }
160                true // Deleted after snapshot - still visible
161            }
162            Some(TxnStatus::Active) | Some(TxnStatus::Aborted) | None => {
163                true // Deletion not committed - still visible
164            }
165        }
166    }
167}
168
169/// Read snapshot for consistent point-in-time queries
170#[derive(Debug, Clone)]
171pub struct Snapshot {
172    /// Transaction ID that owns this snapshot
173    pub txn_id: TxnId,
174    /// Snapshot timestamp (all versions created before this are potentially visible)
175    pub start_ts: Timestamp,
176    /// Set of transaction IDs that were active when snapshot was taken
177    pub active_txns: HashSet<TxnId>,
178    /// Minimum active transaction ID (for garbage collection)
179    pub xmin: TxnId,
180}
181
182impl Snapshot {
183    /// Create a new snapshot
184    pub fn new(txn_id: TxnId, start_ts: Timestamp, active_txns: HashSet<TxnId>) -> Self {
185        let xmin = active_txns.iter().copied().min().unwrap_or(txn_id);
186        Self {
187            txn_id,
188            start_ts,
189            active_txns,
190            xmin,
191        }
192    }
193
194    /// Check if a transaction's changes are visible
195    pub fn is_txn_visible(&self, txn_id: TxnId, commit_ts: Option<Timestamp>) -> bool {
196        if txn_id == self.txn_id {
197            return true; // Own changes are visible
198        }
199
200        if self.active_txns.contains(&txn_id) {
201            return false; // Was in-progress when snapshot was taken
202        }
203
204        match commit_ts {
205            Some(ts) => ts < self.start_ts,
206            None => false, // Not committed
207        }
208    }
209}
210
211/// Transaction manager for MVCC (snapshot-only, no WAL durability)
212///
213/// # Deprecation Notice
214///
215/// **This implementation is deprecated for production use.** It provides snapshot
216/// isolation but does NOT include WAL integration for crash recovery.
217///
218/// ## Migration Guide
219///
220/// For production workloads requiring durability, use [`MvccTransactionManager`]
221/// from `sochdb_storage::wal_integration` which includes:
222/// - Write-ahead logging for crash recovery
223/// - Serializable Snapshot Isolation (SSI)
224/// - Group commit for high throughput
225/// - Event-driven async architecture
226///
227/// ## When to Use This Implementation
228///
229/// - Unit testing without durability overhead
230/// - Ephemeral in-memory operations
231/// - Prototyping snapshot isolation logic
232///
233/// ## See Also
234///
235/// - [`crate::wal_integration::MvccTransactionManager`] - Production transaction manager
236/// - [`crate::transaction::TransactionCoordinator`] - Unified transaction trait
237///
238/// [`MvccTransactionManager`]: crate::MvccTransactionManager
239#[deprecated(
240    since = "0.1.0",
241    note = "Use MvccTransactionManager from wal_integration for production workloads with durability"
242)]
243pub struct TransactionManager {
244    /// Next transaction ID
245    next_txn_id: AtomicU64,
246    /// Global timestamp counter
247    timestamp: AtomicU64,
248    /// Active transactions: txn_id -> (start_ts, status)
249    active_txns: RwLock<HashMap<TxnId, (Timestamp, TxnStatus)>>,
250    /// Committed transaction log: txn_id -> commit_ts
251    commit_log: RwLock<BTreeMap<TxnId, Timestamp>>,
252    /// Minimum active transaction ID (for GC)
253    min_active_txn: AtomicU64,
254}
255
256#[allow(deprecated)]
257impl TransactionManager {
258    /// Create a new transaction manager
259    pub fn new() -> Self {
260        Self {
261            next_txn_id: AtomicU64::new(1),
262            timestamp: AtomicU64::new(1),
263            active_txns: RwLock::new(HashMap::new()),
264            commit_log: RwLock::new(BTreeMap::new()),
265            min_active_txn: AtomicU64::new(u64::MAX),
266        }
267    }
268
269    /// Begin a new transaction
270    pub fn begin(&self) -> (TxnId, Timestamp) {
271        let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
272        let start_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
273
274        {
275            let mut active = self.active_txns.write();
276            active.insert(txn_id, (start_ts, TxnStatus::Active));
277        }
278
279        // Update min active
280        self.update_min_active();
281
282        (txn_id, start_ts)
283    }
284
285    /// Acquire a read snapshot
286    pub fn acquire_snapshot(&self, txn_id: TxnId) -> Snapshot {
287        let active = self.active_txns.read();
288
289        let start_ts = active
290            .get(&txn_id)
291            .map(|(ts, _)| *ts)
292            .unwrap_or_else(|| self.timestamp.load(Ordering::SeqCst));
293
294        let active_set: HashSet<TxnId> = active
295            .iter()
296            .filter(|(id, (_, status))| **id != txn_id && *status == TxnStatus::Active)
297            .map(|(id, _)| *id)
298            .collect();
299
300        Snapshot::new(txn_id, start_ts, active_set)
301    }
302
303    /// Commit a transaction
304    pub fn commit(&self, txn_id: TxnId) -> Option<Timestamp> {
305        let commit_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
306
307        {
308            let mut active = self.active_txns.write();
309            if let Some((_, status)) = active.get_mut(&txn_id) {
310                if *status != TxnStatus::Active {
311                    return None; // Already committed or aborted
312                }
313                *status = TxnStatus::Committed(commit_ts);
314            } else {
315                return None; // Unknown transaction
316            }
317        }
318
319        {
320            let mut log = self.commit_log.write();
321            log.insert(txn_id, commit_ts);
322        }
323
324        // Update min active
325        self.update_min_active();
326
327        Some(commit_ts)
328    }
329
330    /// Abort a transaction
331    pub fn abort(&self, txn_id: TxnId) -> bool {
332        let mut active = self.active_txns.write();
333
334        if let Some((_, status)) = active.get_mut(&txn_id) {
335            if *status != TxnStatus::Active {
336                return false;
337            }
338            *status = TxnStatus::Aborted;
339            self.update_min_active();
340            true
341        } else {
342            false
343        }
344    }
345
346    /// Get transaction status
347    pub fn get_status(&self, txn_id: TxnId) -> Option<TxnStatus> {
348        let active = self.active_txns.read();
349        active.get(&txn_id).map(|(_, status)| *status)
350    }
351
352    /// Get commit timestamp for a transaction
353    pub fn get_commit_ts(&self, txn_id: TxnId) -> Option<Timestamp> {
354        let log = self.commit_log.read();
355        log.get(&txn_id).copied()
356    }
357
358    /// Get minimum active transaction ID (for garbage collection)
359    pub fn min_active_txn_id(&self) -> TxnId {
360        self.min_active_txn.load(Ordering::SeqCst)
361    }
362
363    /// Get current timestamp
364    pub fn current_timestamp(&self) -> Timestamp {
365        self.timestamp.load(Ordering::SeqCst)
366    }
367
368    /// Update minimum active transaction ID
369    fn update_min_active(&self) {
370        let active = self.active_txns.read();
371        let min = active
372            .iter()
373            .filter(|(_, (_, status))| *status == TxnStatus::Active)
374            .map(|(&id, _)| id)
375            .min()
376            .unwrap_or(u64::MAX);
377        self.min_active_txn.store(min, Ordering::SeqCst);
378    }
379
380    /// Garbage collect old transaction records
381    ///
382    /// Removes committed transactions older than the watermark.
383    pub fn gc(&self, watermark: Timestamp) -> usize {
384        let mut log = self.commit_log.write();
385        let mut active = self.active_txns.write();
386
387        let old_len = log.len();
388
389        // Remove old committed entries
390        log.retain(|_, commit_ts| *commit_ts >= watermark);
391
392        // Remove old active entries that are committed
393        active.retain(|txn_id, (_, status)| match status {
394            TxnStatus::Committed(ts) => *ts >= watermark,
395            TxnStatus::Aborted => {
396                // Keep aborted if there might be references
397                log.get(txn_id).map(|t| *t >= watermark).unwrap_or(true)
398            }
399            TxnStatus::Active => true,
400        });
401
402        old_len - log.len()
403    }
404}
405
406#[allow(deprecated)]
407impl Default for TransactionManager {
408    fn default() -> Self {
409        Self::new()
410    }
411}
412
413/// MVCC version chain for a single key
414#[derive(Debug)]
415pub struct VersionChain<V> {
416    /// Versions ordered by creation timestamp (newest first)
417    versions: Vec<(VersionInfo, V)>,
418}
419
420impl<V: Clone> VersionChain<V> {
421    /// Create empty version chain
422    pub fn new() -> Self {
423        Self {
424            versions: Vec::new(),
425        }
426    }
427
428    /// Add a new version
429    pub fn add_version(&mut self, info: VersionInfo, value: V) {
430        // Insert at front (newest first)
431        self.versions.insert(0, (info, value));
432    }
433
434    /// Get visible version for a snapshot
435    #[allow(deprecated)]
436    pub fn get_visible(&self, snapshot: &Snapshot, txn_manager: &TransactionManager) -> Option<&V> {
437        for (info, value) in &self.versions {
438            if info.is_visible(snapshot, txn_manager) {
439                return Some(value);
440            }
441        }
442        None
443    }
444
445    /// Mark latest version as deleted
446    pub fn delete(&mut self, xmax: TxnId, deleted_ts: Timestamp) -> bool {
447        if let Some((info, _)) = self.versions.first_mut()
448            && info.xmax == 0
449        {
450            info.delete(xmax, deleted_ts);
451            return true;
452        }
453        false
454    }
455
456    /// Garbage collect old versions
457    pub fn gc(&mut self, min_visible_ts: Timestamp) -> usize {
458        let old_len = self.versions.len();
459
460        // Keep at least one version, and all versions visible to any active snapshot
461        if self.versions.len() <= 1 {
462            return 0;
463        }
464
465        self.versions
466            .retain(|(info, _)| info.deleted_ts >= min_visible_ts);
467
468        // Always keep at least one version
469        if self.versions.is_empty() {
470            return old_len; // All removed (shouldn't happen normally)
471        }
472
473        old_len - self.versions.len()
474    }
475
476    /// Number of versions
477    pub fn version_count(&self) -> usize {
478        self.versions.len()
479    }
480}
481
482impl<V: Clone> Default for VersionChain<V> {
483    fn default() -> Self {
484        Self::new()
485    }
486}
487
488/// MVCC-aware key-value store
489#[allow(deprecated)]
490pub struct MvccStore<V> {
491    /// Version chains by key
492    data: RwLock<HashMap<Vec<u8>, VersionChain<V>>>,
493    /// Transaction manager
494    txn_manager: Arc<TransactionManager>,
495}
496
497#[allow(deprecated)]
498impl<V: Clone + Send + Sync> MvccStore<V> {
499    /// Create a new MVCC store
500    pub fn new(txn_manager: Arc<TransactionManager>) -> Self {
501        Self {
502            data: RwLock::new(HashMap::new()),
503            txn_manager,
504        }
505    }
506
507    /// Put a value (creates new version)
508    pub fn put(&self, key: &[u8], value: V, txn_id: TxnId) -> Timestamp {
509        let created_ts = self.txn_manager.current_timestamp();
510        let info = VersionInfo::new(txn_id, created_ts);
511
512        let mut data = self.data.write();
513        let chain = data.entry(key.to_vec()).or_default();
514        chain.add_version(info, value);
515
516        created_ts
517    }
518
519    /// Get visible value for a snapshot
520    pub fn get(&self, key: &[u8], snapshot: &Snapshot) -> Option<V> {
521        let data = self.data.read();
522        data.get(key)
523            .and_then(|chain| chain.get_visible(snapshot, &self.txn_manager))
524            .cloned()
525    }
526
527    /// Delete a key (marks version as deleted)
528    pub fn delete(&self, key: &[u8], txn_id: TxnId) -> bool {
529        let deleted_ts = self.txn_manager.current_timestamp();
530        let mut data = self.data.write();
531
532        if let Some(chain) = data.get_mut(key) {
533            chain.delete(txn_id, deleted_ts)
534        } else {
535            false
536        }
537    }
538
539    /// Garbage collect old versions
540    pub fn gc(&self) -> usize {
541        let min_visible = self.txn_manager.min_active_txn_id();
542        let min_ts = self
543            .txn_manager
544            .get_commit_ts(min_visible)
545            .unwrap_or(self.txn_manager.current_timestamp());
546
547        let mut data = self.data.write();
548        let mut total_gc = 0;
549
550        for chain in data.values_mut() {
551            total_gc += chain.gc(min_ts);
552        }
553
554        total_gc
555    }
556}
557
558#[cfg(test)]
559mod tests {
560    use super::*;
561
562    #[test]
563    fn test_basic_visibility() {
564        let manager = TransactionManager::new();
565
566        // Transaction 1 creates version
567        let (txn1, _) = manager.begin();
568        let _snapshot1 = manager.acquire_snapshot(txn1);
569        manager.commit(txn1);
570
571        // Transaction 2 reads
572        let (txn2, _) = manager.begin();
573        let snapshot2 = manager.acquire_snapshot(txn2);
574
575        // Transaction 1's changes should be visible to transaction 2
576        assert!(snapshot2.is_txn_visible(txn1, manager.get_commit_ts(txn1)));
577
578        manager.commit(txn2);
579    }
580
581    #[test]
582    fn test_snapshot_isolation() {
583        let manager = Arc::new(TransactionManager::new());
584        let store = MvccStore::new(manager.clone());
585
586        // Transaction 1 writes value
587        let (txn1, _) = manager.begin();
588        store.put(b"key1", "value1".to_string(), txn1);
589        manager.commit(txn1);
590
591        // Transaction 2 takes snapshot
592        let (txn2, _) = manager.begin();
593        let snapshot2 = manager.acquire_snapshot(txn2);
594
595        // Transaction 3 updates value (after snapshot2)
596        let (txn3, _) = manager.begin();
597        store.put(b"key1", "value2".to_string(), txn3);
598        manager.commit(txn3);
599
600        // Snapshot2 should still see "value1"
601        let value = store.get(b"key1", &snapshot2);
602        assert_eq!(value, Some("value1".to_string()));
603
604        manager.commit(txn2);
605    }
606
607    #[test]
608    fn test_version_chain() {
609        let manager = Arc::new(TransactionManager::new());
610
611        let mut chain: VersionChain<String> = VersionChain::new();
612
613        // Add first version
614        let (txn1, _) = manager.begin();
615        let info1 = VersionInfo::new(txn1, manager.current_timestamp());
616        chain.add_version(info1, "v1".to_string());
617        manager.commit(txn1);
618
619        // Add second version
620        let (txn2, _) = manager.begin();
621        let info2 = VersionInfo::new(txn2, manager.current_timestamp());
622        chain.add_version(info2, "v2".to_string());
623        manager.commit(txn2);
624
625        assert_eq!(chain.version_count(), 2);
626
627        // Latest snapshot should see v2
628        let (txn3, _) = manager.begin();
629        let snapshot = manager.acquire_snapshot(txn3);
630        assert_eq!(
631            chain.get_visible(&snapshot, &manager),
632            Some(&"v2".to_string())
633        );
634    }
635
636    #[test]
637    #[ignore] // Slow test - run locally with: cargo test -- --ignored
638    fn test_abort_not_visible() {
639        let manager = Arc::new(TransactionManager::new());
640        let store = MvccStore::new(manager.clone());
641
642        // Transaction 1 writes and aborts
643        let (txn1, _) = manager.begin();
644        store.put(b"key1", "value1".to_string(), txn1);
645        manager.abort(txn1);
646
647        // Transaction 2 should not see the aborted write
648        let (txn2, _) = manager.begin();
649        let snapshot2 = manager.acquire_snapshot(txn2);
650
651        let value = store.get(b"key1", &snapshot2);
652        assert_eq!(value, None);
653    }
654}
655
656// =============================================================================
657// Lock-Free MVCC (Task 2 Enhancement)
658// =============================================================================
659
660/// Lock-free transaction entry using epoch-based reclamation
661struct EpochTxnEntry {
662    txn_id: TxnId,
663    start_ts: Timestamp,
664    status: AtomicU64, // 0 = Active, 1+ = Committed(ts), u64::MAX = Aborted
665}
666
667impl EpochTxnEntry {
668    fn new(txn_id: TxnId, start_ts: Timestamp) -> Self {
669        Self {
670            txn_id,
671            start_ts,
672            status: AtomicU64::new(0), // Active
673        }
674    }
675
676    fn get_status(&self) -> TxnStatus {
677        let val = self.status.load(Ordering::Acquire);
678        if val == 0 {
679            TxnStatus::Active
680        } else if val == u64::MAX {
681            TxnStatus::Aborted
682        } else {
683            TxnStatus::Committed(val)
684        }
685    }
686
687    fn try_commit(&self, commit_ts: Timestamp) -> bool {
688        self.status
689            .compare_exchange(0, commit_ts, Ordering::AcqRel, Ordering::Acquire)
690            .is_ok()
691    }
692
693    fn try_abort(&self) -> bool {
694        self.status
695            .compare_exchange(0, u64::MAX, Ordering::AcqRel, Ordering::Acquire)
696            .is_ok()
697    }
698}
699
700/// Epoch node for lock-free linked list
701struct EpochNode {
702    entry: EpochTxnEntry,
703    next: Atomic<EpochNode>,
704}
705
706/// Lock-free MVCC manager using epoch-based reclamation
707///
708/// ## Performance Characteristics
709/// - Wait-free visibility checks (single atomic load)
710/// - Lock-free transaction begin/commit/abort
711/// - Epoch-based garbage collection (no synchronization for reads)
712///
713/// ## Memory Safety
714/// Uses crossbeam-epoch for safe concurrent memory reclamation:
715/// - Readers pin the current epoch
716/// - Writers defer destruction to future epochs
717/// - GC runs incrementally without blocking readers
718pub struct LockFreeMvccManager {
719    /// Next transaction ID
720    next_txn_id: AtomicU64,
721    /// Global timestamp counter
722    timestamp: AtomicU64,
723    /// Head of active transaction list
724    active_head: Atomic<EpochNode>,
725    /// Committed transactions for visibility (sorted by commit_ts)
726    committed: crossbeam_skiplist::SkipMap<TxnId, Timestamp>,
727    /// Minimum visible timestamp (for GC)
728    min_visible_ts: AtomicU64,
729    /// Number of active transactions
730    active_count: AtomicU64,
731}
732
733impl LockFreeMvccManager {
734    /// Create a new lock-free MVCC manager
735    pub fn new() -> Self {
736        Self {
737            next_txn_id: AtomicU64::new(1),
738            timestamp: AtomicU64::new(1),
739            active_head: Atomic::null(),
740            committed: crossbeam_skiplist::SkipMap::new(),
741            min_visible_ts: AtomicU64::new(0),
742            active_count: AtomicU64::new(0),
743        }
744    }
745
746    /// Begin a new transaction (lock-free)
747    pub fn begin(&self) -> (TxnId, Timestamp) {
748        let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
749        let start_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
750
751        let guard = epoch::pin();
752
753        // Create entry and insert at head (lock-free CAS loop)
754        let mut new_node = Owned::new(EpochNode {
755            entry: EpochTxnEntry::new(txn_id, start_ts),
756            next: Atomic::null(),
757        });
758
759        loop {
760            let head = self.active_head.load(Ordering::Acquire, &guard);
761            new_node.next.store(head, Ordering::Release);
762
763            match self.active_head.compare_exchange(
764                head,
765                new_node,
766                Ordering::AcqRel,
767                Ordering::Acquire,
768                &guard,
769            ) {
770                Ok(_) => {
771                    self.active_count.fetch_add(1, Ordering::Relaxed);
772                    break;
773                }
774                Err(e) => {
775                    // CAS failed, get ownership back and retry
776                    new_node = e.new;
777                }
778            }
779        }
780
781        (txn_id, start_ts)
782    }
783
784    /// Commit a transaction (lock-free CAS)
785    pub fn commit(&self, txn_id: TxnId) -> Option<Timestamp> {
786        let commit_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
787
788        // Find the transaction entry
789        let guard = epoch::pin();
790        let mut current = self.active_head.load(Ordering::Acquire, &guard);
791
792        while let Some(node) = unsafe { current.as_ref() } {
793            if node.entry.txn_id == txn_id {
794                if node.entry.try_commit(commit_ts) {
795                    // Record in committed set
796                    self.committed.insert(txn_id, commit_ts);
797                    self.active_count.fetch_sub(1, Ordering::Relaxed);
798                    return Some(commit_ts);
799                } else {
800                    return None; // Already committed or aborted
801                }
802            }
803            current = node.next.load(Ordering::Acquire, &guard);
804        }
805
806        None // Transaction not found
807    }
808
809    /// Abort a transaction (lock-free)
810    pub fn abort(&self, txn_id: TxnId) -> bool {
811        let guard = epoch::pin();
812        let mut current = self.active_head.load(Ordering::Acquire, &guard);
813
814        while let Some(node) = unsafe { current.as_ref() } {
815            if node.entry.txn_id == txn_id {
816                let success = node.entry.try_abort();
817                if success {
818                    self.active_count.fetch_sub(1, Ordering::Relaxed);
819                }
820                return success;
821            }
822            current = node.next.load(Ordering::Acquire, &guard);
823        }
824
825        false
826    }
827
828    /// Get transaction status (wait-free read)
829    pub fn get_status(&self, txn_id: TxnId) -> Option<TxnStatus> {
830        // First check committed set (fast path)
831        if let Some(entry) = self.committed.get(&txn_id) {
832            return Some(TxnStatus::Committed(*entry.value()));
833        }
834
835        // Search active list
836        let guard = epoch::pin();
837        let mut current = self.active_head.load(Ordering::Acquire, &guard);
838
839        while let Some(node) = unsafe { current.as_ref() } {
840            if node.entry.txn_id == txn_id {
841                return Some(node.entry.get_status());
842            }
843            current = node.next.load(Ordering::Acquire, &guard);
844        }
845
846        None
847    }
848
849    /// Acquire a snapshot (wait-free for visibility checks)
850    pub fn acquire_snapshot(&self, txn_id: TxnId) -> Snapshot {
851        let guard = epoch::pin();
852
853        // Get start timestamp
854        let start_ts = {
855            let mut ts = self.timestamp.load(Ordering::SeqCst);
856            let mut current = self.active_head.load(Ordering::Acquire, &guard);
857
858            while let Some(node) = unsafe { current.as_ref() } {
859                if node.entry.txn_id == txn_id {
860                    ts = node.entry.start_ts;
861                    break;
862                }
863                current = node.next.load(Ordering::Acquire, &guard);
864            }
865            ts
866        };
867
868        // Collect active transactions
869        let mut active_set = HashSet::new();
870        let mut current = self.active_head.load(Ordering::Acquire, &guard);
871
872        while let Some(node) = unsafe { current.as_ref() } {
873            if node.entry.txn_id != txn_id && matches!(node.entry.get_status(), TxnStatus::Active) {
874                active_set.insert(node.entry.txn_id);
875            }
876            current = node.next.load(Ordering::Acquire, &guard);
877        }
878
879        Snapshot::new(txn_id, start_ts, active_set)
880    }
881
882    /// Get current timestamp
883    pub fn current_timestamp(&self) -> Timestamp {
884        self.timestamp.load(Ordering::SeqCst)
885    }
886
887    /// Get active transaction count
888    pub fn active_count(&self) -> u64 {
889        self.active_count.load(Ordering::Relaxed)
890    }
891
892    /// Epoch-based garbage collection
893    ///
894    /// Removes transaction entries that are:
895    /// 1. Committed before the watermark
896    /// 2. No longer needed for any active snapshot
897    pub fn gc(&self, watermark: Timestamp) -> usize {
898        self.min_visible_ts.store(watermark, Ordering::Release);
899
900        // Remove old committed entries
901        let mut removed = 0;
902        let entries_to_remove: Vec<_> = self
903            .committed
904            .iter()
905            .filter(|entry| *entry.value() < watermark)
906            .map(|entry| *entry.key())
907            .collect();
908
909        for txn_id in entries_to_remove {
910            if self.committed.remove(&txn_id).is_some() {
911                removed += 1;
912            }
913        }
914
915        // Clean up inactive nodes from the linked list
916        // This is done by epoch-based deferred destruction
917        let guard = epoch::pin();
918        let _prev: Option<&EpochNode> = None;
919        let mut current = self.active_head.load(Ordering::Acquire, &guard);
920
921        while let Some(node) = unsafe { current.as_ref() } {
922            let status = node.entry.get_status();
923            match status {
924                TxnStatus::Committed(ts) if ts < watermark => {
925                    // Node can be unlinked - but for simplicity, just mark and skip
926                    // Full list cleanup would require double-CAS or hazard pointers
927                }
928                TxnStatus::Aborted => {
929                    // Similar treatment for aborted
930                }
931                _ => {}
932            }
933            current = node.next.load(Ordering::Acquire, &guard);
934        }
935
936        // Advance the epoch to allow memory reclamation
937        drop(guard);
938        epoch::pin().flush();
939
940        removed
941    }
942}
943
944impl Default for LockFreeMvccManager {
945    fn default() -> Self {
946        Self::new()
947    }
948}
949
950#[cfg(test)]
951mod lock_free_tests {
952    use super::*;
953    use std::thread;
954
955    #[test]
956    fn test_lock_free_basic() {
957        let manager = LockFreeMvccManager::new();
958
959        let (txn1, ts1) = manager.begin();
960        assert!(ts1 > 0);
961        assert_eq!(manager.get_status(txn1), Some(TxnStatus::Active));
962
963        let commit_ts = manager.commit(txn1).unwrap();
964        assert!(commit_ts > ts1);
965        assert_eq!(
966            manager.get_status(txn1),
967            Some(TxnStatus::Committed(commit_ts))
968        );
969    }
970
971    #[test]
972    fn test_lock_free_abort() {
973        let manager = LockFreeMvccManager::new();
974
975        let (txn1, _) = manager.begin();
976        assert!(manager.abort(txn1));
977        assert_eq!(manager.get_status(txn1), Some(TxnStatus::Aborted));
978
979        // Cannot commit after abort
980        assert!(manager.commit(txn1).is_none());
981    }
982
983    #[test]
984    fn test_lock_free_concurrent() {
985        use std::sync::Arc;
986
987        let manager = Arc::new(LockFreeMvccManager::new());
988        let num_threads = 8;
989        let txns_per_thread = 100;
990
991        let handles: Vec<_> = (0..num_threads)
992            .map(|_| {
993                let m = Arc::clone(&manager);
994                thread::spawn(move || {
995                    for _ in 0..txns_per_thread {
996                        let (txn_id, _) = m.begin();
997                        m.commit(txn_id);
998                    }
999                })
1000            })
1001            .collect();
1002
1003        for h in handles {
1004            h.join().unwrap();
1005        }
1006
1007        // All transactions should be committed
1008        let total = num_threads * txns_per_thread;
1009        assert!(manager.committed.len() >= total as usize);
1010    }
1011
1012    #[test]
1013    fn test_lock_free_snapshot() {
1014        let manager = LockFreeMvccManager::new();
1015
1016        let (txn1, _) = manager.begin();
1017        manager.commit(txn1);
1018
1019        let (txn2, _) = manager.begin();
1020        let snapshot = manager.acquire_snapshot(txn2);
1021
1022        // txn1 should be visible, txn2 should not be in active set
1023        assert!(!snapshot.active_txns.contains(&txn1));
1024        assert!(!snapshot.active_txns.contains(&txn2));
1025    }
1026}