1use crate::write_set::{WriteSet, WriteSetApplicator};
2use contextdb_core::{AtomicLsn, AtomicTxId, Error, Lsn, Result, RowId, SnapshotId, TxId};
3use parking_lot::Mutex;
4use std::collections::HashMap;
5use std::sync::atomic::Ordering;
6
7pub struct TxManager<S: WriteSetApplicator> {
8 next_tx: AtomicTxId,
9 committed_watermark: AtomicTxId,
10 next_lsn: AtomicLsn,
11 active_txs: Mutex<HashMap<TxId, WriteSet>>,
12 commit_mutex: Mutex<()>,
13 store: S,
14}
15
16impl<S: WriteSetApplicator> TxManager<S> {
17 pub fn new(store: S) -> Self {
18 Self::new_with_counters(store, TxId(1), Lsn(1), TxId(0))
19 }
20
21 pub fn new_with_counters(
22 store: S,
23 next_tx: TxId,
24 next_lsn: Lsn,
25 committed_watermark: TxId,
26 ) -> Self {
27 Self {
28 next_tx: AtomicTxId::new(next_tx),
29 committed_watermark: AtomicTxId::new(committed_watermark),
30 next_lsn: AtomicLsn::new(next_lsn),
31 active_txs: Mutex::new(HashMap::new()),
32 commit_mutex: Mutex::new(()),
33 store,
34 }
35 }
36
37 pub fn begin(&self) -> TxId {
38 let tx_id = self.next_tx.fetch_add(1, Ordering::SeqCst);
39 let mut active = self.active_txs.lock();
40 active.insert(tx_id, WriteSet::new());
41 tx_id
42 }
43
44 pub fn snapshot(&self) -> SnapshotId {
45 SnapshotId::from_tx(self.committed_watermark.load(Ordering::SeqCst))
46 }
47
48 pub fn with_write_set<F, R>(&self, tx: TxId, f: F) -> Result<R>
49 where
50 F: FnOnce(&mut WriteSet) -> R,
51 {
52 let mut active = self.active_txs.lock();
53 let ws = active.get_mut(&tx).ok_or(Error::TxNotFound(tx))?;
54 Ok(f(ws))
55 }
56
57 pub fn cloned_write_set(&self, tx: TxId) -> Result<WriteSet> {
58 let active = self.active_txs.lock();
59 active.get(&tx).cloned().ok_or(Error::TxNotFound(tx))
60 }
61
62 pub fn commit(&self, tx: TxId) -> Result<()> {
63 self.commit_with_lsn(tx).map(|_| ())
64 }
65
66 pub fn commit_with_lsn(&self, tx: TxId) -> Result<Lsn> {
67 let _lock = self.commit_mutex.lock();
68 let mut ws = self.cloned_write_set(tx)?;
69 let lsn = self.next_lsn.fetch_add(1, Ordering::SeqCst);
70 ws.stamp_lsn(lsn);
71 self.store.apply(ws)?;
72 let mut active = self.active_txs.lock();
73 active.remove(&tx).ok_or(Error::TxNotFound(tx))?;
74 self.committed_watermark.fetch_max(tx, Ordering::SeqCst);
75 Ok(lsn)
76 }
77
78 pub fn rollback(&self, tx: TxId) -> Result<()> {
79 let mut active = self.active_txs.lock();
80 active.remove(&tx).ok_or(Error::TxNotFound(tx))?;
81 Ok(())
82 }
83
84 pub fn store(&self) -> &S {
85 &self.store
86 }
87
88 pub fn new_row_id(&self) -> RowId {
89 self.store.new_row_id()
90 }
91
92 pub fn current_lsn(&self) -> Lsn {
93 let raw = self.next_lsn.load(Ordering::SeqCst).0.saturating_sub(1);
94 Lsn(raw)
95 }
96
97 pub fn allocate_ddl_lsn<F, R>(&self, f: F) -> R
98 where
99 F: FnOnce(Lsn) -> R,
100 {
101 let _lock = self.commit_mutex.lock();
102 let lsn = self.next_lsn.fetch_add(1, Ordering::SeqCst);
103 f(lsn)
104 }
105
106 pub fn with_commit_lock<F, R>(&self, f: F) -> R
107 where
108 F: FnOnce() -> R,
109 {
110 let _lock = self.commit_mutex.lock();
111 f()
112 }
113
114 #[inline]
117 pub fn current_tx_max(&self) -> TxId {
118 self.committed_watermark.load(Ordering::SeqCst)
119 }
120
121 #[inline]
123 pub fn peek_next_tx(&self) -> TxId {
124 self.next_tx.load(Ordering::SeqCst)
125 }
126
127 pub fn advance_for_sync(&self, table: &str, incoming: TxId) -> Result<()> {
131 if incoming.0 == u64::MAX {
132 return Err(Error::TxIdOverflow {
133 table: table.to_string(),
134 incoming: u64::MAX,
135 });
136 }
137 self.next_tx
138 .fetch_max(TxId(incoming.0 + 1), Ordering::SeqCst);
139 self.committed_watermark
140 .fetch_max(incoming, Ordering::SeqCst);
141 Ok(())
142 }
143}
144
145impl<S: WriteSetApplicator> contextdb_core::TransactionManager for TxManager<S> {
146 fn begin(&self) -> TxId {
147 TxManager::begin(self)
148 }
149
150 fn commit(&self, tx: TxId) -> Result<()> {
151 TxManager::commit(self, tx)
152 }
153
154 fn rollback(&self, tx: TxId) -> Result<()> {
155 TxManager::rollback(self, tx)
156 }
157
158 fn snapshot(&self) -> SnapshotId {
159 TxManager::snapshot(self)
160 }
161}