Skip to main content

contextdb_tx/
manager.rs

1use crate::write_set::{WriteSet, WriteSetApplicator};
2use contextdb_core::{Error, Result, RowId, SnapshotId, TxId};
3use parking_lot::Mutex;
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, Ordering};
6
7pub struct TxManager<S: WriteSetApplicator> {
8    next_tx: AtomicU64,
9    committed_watermark: AtomicU64,
10    next_lsn: AtomicU64,
11    active_txs: Mutex<HashMap<TxId, WriteSet>>,
12    commit_mutex: Mutex<()>,
13    store: S,
14}
15
16impl<S: WriteSetApplicator> TxManager<S> {
17    pub fn new(store: S) -> Self {
18        Self::new_with_counters(store, 1, 1, 0)
19    }
20
21    pub fn new_with_counters(
22        store: S,
23        next_tx: TxId,
24        next_lsn: u64,
25        committed_watermark: TxId,
26    ) -> Self {
27        Self {
28            next_tx: AtomicU64::new(next_tx),
29            committed_watermark: AtomicU64::new(committed_watermark),
30            next_lsn: AtomicU64::new(next_lsn),
31            active_txs: Mutex::new(HashMap::new()),
32            commit_mutex: Mutex::new(()),
33            store,
34        }
35    }
36
37    pub fn begin(&self) -> TxId {
38        let tx_id = self.next_tx.fetch_add(1, Ordering::SeqCst);
39        let mut active = self.active_txs.lock();
40        active.insert(tx_id, WriteSet::new());
41        tx_id
42    }
43
44    pub fn snapshot(&self) -> SnapshotId {
45        self.committed_watermark.load(Ordering::SeqCst)
46    }
47
48    pub fn with_write_set<F, R>(&self, tx: TxId, f: F) -> Result<R>
49    where
50        F: FnOnce(&mut WriteSet) -> R,
51    {
52        let mut active = self.active_txs.lock();
53        let ws = active.get_mut(&tx).ok_or(Error::TxNotFound(tx))?;
54        Ok(f(ws))
55    }
56
57    pub fn cloned_write_set(&self, tx: TxId) -> Result<WriteSet> {
58        let active = self.active_txs.lock();
59        active.get(&tx).cloned().ok_or(Error::TxNotFound(tx))
60    }
61
62    pub fn commit(&self, tx: TxId) -> Result<()> {
63        self.commit_with_lsn(tx).map(|_| ())
64    }
65
66    pub fn commit_with_lsn(&self, tx: TxId) -> Result<u64> {
67        let _lock = self.commit_mutex.lock();
68        let mut ws = self.cloned_write_set(tx)?;
69        let lsn = self.next_lsn.fetch_add(1, Ordering::SeqCst);
70        ws.stamp_lsn(lsn);
71        self.store.apply(ws)?;
72        let mut active = self.active_txs.lock();
73        active.remove(&tx).ok_or(Error::TxNotFound(tx))?;
74        self.committed_watermark.fetch_max(tx, Ordering::SeqCst);
75        Ok(lsn)
76    }
77
78    pub fn rollback(&self, tx: TxId) -> Result<()> {
79        let mut active = self.active_txs.lock();
80        active.remove(&tx).ok_or(Error::TxNotFound(tx))?;
81        Ok(())
82    }
83
84    pub fn store(&self) -> &S {
85        &self.store
86    }
87
88    pub fn new_row_id(&self) -> RowId {
89        self.store.new_row_id()
90    }
91
92    pub fn current_lsn(&self) -> u64 {
93        self.next_lsn.load(Ordering::SeqCst).saturating_sub(1)
94    }
95
96    pub fn allocate_ddl_lsn<F, R>(&self, f: F) -> R
97    where
98        F: FnOnce(u64) -> R,
99    {
100        let _lock = self.commit_mutex.lock();
101        let lsn = self.next_lsn.fetch_add(1, Ordering::SeqCst);
102        f(lsn)
103    }
104
105    pub fn with_commit_lock<F, R>(&self, f: F) -> R
106    where
107        F: FnOnce() -> R,
108    {
109        let _lock = self.commit_mutex.lock();
110        f()
111    }
112}
113
114impl<S: WriteSetApplicator> contextdb_core::TransactionManager for TxManager<S> {
115    fn begin(&self) -> TxId {
116        TxManager::begin(self)
117    }
118
119    fn commit(&self, tx: TxId) -> Result<()> {
120        TxManager::commit(self, tx)
121    }
122
123    fn rollback(&self, tx: TxId) -> Result<()> {
124        TxManager::rollback(self, tx)
125    }
126
127    fn snapshot(&self) -> SnapshotId {
128        TxManager::snapshot(self)
129    }
130}