Skip to main content

contextdb_tx/
manager.rs

1use crate::write_set::{WriteSet, WriteSetApplicator};
2use contextdb_core::{AtomicLsn, AtomicTxId, Error, Lsn, Result, RowId, SnapshotId, TxId};
3use parking_lot::Mutex;
4use std::collections::HashMap;
5use std::sync::atomic::Ordering;
6
7pub struct TxManager<S: WriteSetApplicator> {
8    next_tx: AtomicTxId,
9    committed_watermark: AtomicTxId,
10    next_lsn: AtomicLsn,
11    active_txs: Mutex<HashMap<TxId, WriteSet>>,
12    commit_mutex: Mutex<()>,
13    store: S,
14}
15
16impl<S: WriteSetApplicator> TxManager<S> {
17    pub fn new(store: S) -> Self {
18        Self::new_with_counters(store, TxId(1), Lsn(1), TxId(0))
19    }
20
21    pub fn new_with_counters(
22        store: S,
23        next_tx: TxId,
24        next_lsn: Lsn,
25        committed_watermark: TxId,
26    ) -> Self {
27        Self {
28            next_tx: AtomicTxId::new(next_tx),
29            committed_watermark: AtomicTxId::new(committed_watermark),
30            next_lsn: AtomicLsn::new(next_lsn),
31            active_txs: Mutex::new(HashMap::new()),
32            commit_mutex: Mutex::new(()),
33            store,
34        }
35    }
36
37    pub fn begin(&self) -> TxId {
38        let tx_id = self.next_tx.fetch_add(1, Ordering::SeqCst);
39        let mut active = self.active_txs.lock();
40        active.insert(tx_id, WriteSet::new());
41        tx_id
42    }
43
44    pub fn snapshot(&self) -> SnapshotId {
45        SnapshotId::from_tx(self.committed_watermark.load(Ordering::SeqCst))
46    }
47
48    pub fn with_write_set<F, R>(&self, tx: TxId, f: F) -> Result<R>
49    where
50        F: FnOnce(&mut WriteSet) -> R,
51    {
52        let mut active = self.active_txs.lock();
53        let ws = active.get_mut(&tx).ok_or(Error::TxNotFound(tx))?;
54        Ok(f(ws))
55    }
56
57    pub fn cloned_write_set(&self, tx: TxId) -> Result<WriteSet> {
58        let active = self.active_txs.lock();
59        active.get(&tx).cloned().ok_or(Error::TxNotFound(tx))
60    }
61
62    pub fn commit(&self, tx: TxId) -> Result<()> {
63        self.commit_with_lsn(tx).map(|_| ())
64    }
65
66    pub fn commit_with_lsn(&self, tx: TxId) -> Result<Lsn> {
67        let _lock = self.commit_mutex.lock();
68        let mut ws = self.cloned_write_set(tx)?;
69        let lsn = self.next_lsn.fetch_add(1, Ordering::SeqCst);
70        ws.stamp_lsn(lsn);
71        self.store.apply(ws)?;
72        let mut active = self.active_txs.lock();
73        active.remove(&tx).ok_or(Error::TxNotFound(tx))?;
74        self.committed_watermark.fetch_max(tx, Ordering::SeqCst);
75        Ok(lsn)
76    }
77
78    pub fn rollback(&self, tx: TxId) -> Result<()> {
79        let mut active = self.active_txs.lock();
80        active.remove(&tx).ok_or(Error::TxNotFound(tx))?;
81        Ok(())
82    }
83
84    pub fn store(&self) -> &S {
85        &self.store
86    }
87
88    pub fn new_row_id(&self) -> RowId {
89        self.store.new_row_id()
90    }
91
92    pub fn current_lsn(&self) -> Lsn {
93        let raw = self.next_lsn.load(Ordering::SeqCst).0.saturating_sub(1);
94        Lsn(raw)
95    }
96
97    pub fn allocate_ddl_lsn<F, R>(&self, f: F) -> R
98    where
99        F: FnOnce(Lsn) -> R,
100    {
101        let _lock = self.commit_mutex.lock();
102        let lsn = self.next_lsn.fetch_add(1, Ordering::SeqCst);
103        f(lsn)
104    }
105
106    pub fn with_commit_lock<F, R>(&self, f: F) -> R
107    where
108        F: FnOnce() -> R,
109    {
110        let _lock = self.commit_mutex.lock();
111        f()
112    }
113
114    /// Returns the highest-committed TxId (the statement-scoped max for user-SQL
115    /// bound checks against TXID columns). Reads `committed_watermark` under `SeqCst`.
116    #[inline]
117    pub fn current_tx_max(&self) -> TxId {
118        self.committed_watermark.load(Ordering::SeqCst)
119    }
120
121    /// Returns the next transaction id the allocator will issue (peek, no increment).
122    #[inline]
123    pub fn peek_next_tx(&self) -> TxId {
124        self.next_tx.load(Ordering::SeqCst)
125    }
126
127    /// Advances the local TxId allocator past an incoming peer TxId seen during
128    /// sync-apply. The overflow guard fires on `u64::MAX` before any mutation so
129    /// the receiver's counters are unchanged on error.
130    pub fn advance_for_sync(&self, table: &str, incoming: TxId) -> Result<()> {
131        if incoming.0 == u64::MAX {
132            return Err(Error::TxIdOverflow {
133                table: table.to_string(),
134                incoming: u64::MAX,
135            });
136        }
137        self.next_tx
138            .fetch_max(TxId(incoming.0 + 1), Ordering::SeqCst);
139        self.committed_watermark
140            .fetch_max(incoming, Ordering::SeqCst);
141        Ok(())
142    }
143}
144
145impl<S: WriteSetApplicator> contextdb_core::TransactionManager for TxManager<S> {
146    fn begin(&self) -> TxId {
147        TxManager::begin(self)
148    }
149
150    fn commit(&self, tx: TxId) -> Result<()> {
151        TxManager::commit(self, tx)
152    }
153
154    fn rollback(&self, tx: TxId) -> Result<()> {
155        TxManager::rollback(self, tx)
156    }
157
158    fn snapshot(&self) -> SnapshotId {
159        TxManager::snapshot(self)
160    }
161}