Skip to main content

dbx_core/transaction/mvcc/
manager.rs

1use dashmap::DashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5/// A source of monotonically increasing timestamps for MVCC.
6///
7/// This oracle ensures that every transaction gets a unique logical timestamp
8/// for determining visibility and commit ordering.
9///
10/// - `read_ts`: The point in time a transaction reads from (snapshot).
11/// - `commit_ts`: The point in time a transaction writes its changes.
12#[derive(Debug)]
13pub struct TimestampOracle {
14    /// The next available timestamp. Starts at 1.
15    next_ts: AtomicU64,
16}
17
18impl TimestampOracle {
19    /// Create a new TimestampOracle starting at the given timestamp.
20    pub fn new(start_ts: u64) -> Self {
21        Self {
22            next_ts: AtomicU64::new(start_ts),
23        }
24    }
25
26    /// Allocate and return the next timestamp.
27    /// Returns the NEW value after incrementing.
28    pub fn next(&self) -> u64 {
29        self.next_ts.fetch_add(1, Ordering::SeqCst) + 1
30    }
31
32    /// Read the current timestamp without incrementing (e.g., for status checks).
33    pub fn read(&self) -> u64 {
34        self.next_ts.load(Ordering::SeqCst)
35    }
36}
37
38impl Default for TimestampOracle {
39    fn default() -> Self {
40        Self::new(1)
41    }
42}
43
44/// Managing active transactions and cleanup.
45#[derive(Debug)]
46pub struct TransactionManager {
47    oracle: Arc<TimestampOracle>,
48    /// Active transactions: tx_id -> read_ts
49    active_txs: Arc<DashMap<u64, u64>>,
50}
51
52impl Default for TransactionManager {
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58impl TransactionManager {
59    pub fn new() -> Self {
60        Self {
61            oracle: Arc::new(TimestampOracle::default()),
62            active_txs: Arc::new(DashMap::new()),
63        }
64    }
65
66    /// Start a new transaction, allocating a read timestamp.
67    pub fn begin_transaction(&self) -> u64 {
68        let read_ts = self.oracle.next();
69        let tx_id = read_ts; // Use read_ts as tx_id for simplicity
70        self.active_txs.insert(tx_id, read_ts);
71        read_ts
72    }
73
74    /// Allocate a commit timestamp.
75    pub fn allocate_commit_ts(&self) -> u64 {
76        self.oracle.next()
77    }
78
79    /// Read the current max timestamp (for non-transactional reads).
80    pub fn current_ts(&self) -> u64 {
81        self.oracle.read()
82    }
83
84    /// Mark a transaction as completed (committed or rolled back).
85    pub fn end_transaction(&self, tx_id: u64) {
86        self.active_txs.remove(&tx_id);
87    }
88
89    /// Get the minimum active read timestamp.
90    ///
91    /// This is the watermark below which versions can be safely garbage collected.
92    /// Returns None if there are no active transactions.
93    pub fn min_active_ts(&self) -> Option<u64> {
94        self.active_txs.iter().map(|entry| *entry.value()).min()
95    }
96
97    /// Get the number of active transactions.
98    pub fn active_count(&self) -> usize {
99        self.active_txs.len()
100    }
101}