1use crate::write_set::{WriteSet, WriteSetApplicator};
2use contextdb_core::{Error, Result, RowId, SnapshotId, TxId};
3use parking_lot::Mutex;
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, Ordering};
6
7pub struct TxManager<S: WriteSetApplicator> {
8 next_tx: AtomicU64,
9 committed_watermark: AtomicU64,
10 next_lsn: AtomicU64,
11 active_txs: Mutex<HashMap<TxId, WriteSet>>,
12 commit_mutex: Mutex<()>,
13 store: S,
14}
15
16impl<S: WriteSetApplicator> TxManager<S> {
17 pub fn new(store: S) -> Self {
18 Self::new_with_counters(store, 1, 1, 0)
19 }
20
21 pub fn new_with_counters(
22 store: S,
23 next_tx: TxId,
24 next_lsn: u64,
25 committed_watermark: TxId,
26 ) -> Self {
27 Self {
28 next_tx: AtomicU64::new(next_tx),
29 committed_watermark: AtomicU64::new(committed_watermark),
30 next_lsn: AtomicU64::new(next_lsn),
31 active_txs: Mutex::new(HashMap::new()),
32 commit_mutex: Mutex::new(()),
33 store,
34 }
35 }
36
37 pub fn begin(&self) -> TxId {
38 let tx_id = self.next_tx.fetch_add(1, Ordering::SeqCst);
39 let mut active = self.active_txs.lock();
40 active.insert(tx_id, WriteSet::new());
41 tx_id
42 }
43
44 pub fn snapshot(&self) -> SnapshotId {
45 self.committed_watermark.load(Ordering::SeqCst)
46 }
47
48 pub fn with_write_set<F, R>(&self, tx: TxId, f: F) -> Result<R>
49 where
50 F: FnOnce(&mut WriteSet) -> R,
51 {
52 let mut active = self.active_txs.lock();
53 let ws = active.get_mut(&tx).ok_or(Error::TxNotFound(tx))?;
54 Ok(f(ws))
55 }
56
57 pub fn cloned_write_set(&self, tx: TxId) -> Result<WriteSet> {
58 let active = self.active_txs.lock();
59 active.get(&tx).cloned().ok_or(Error::TxNotFound(tx))
60 }
61
62 pub fn commit(&self, tx: TxId) -> Result<()> {
63 self.commit_with_lsn(tx).map(|_| ())
64 }
65
66 pub fn commit_with_lsn(&self, tx: TxId) -> Result<u64> {
67 let _lock = self.commit_mutex.lock();
68 let mut ws = self.cloned_write_set(tx)?;
69 let lsn = self.next_lsn.fetch_add(1, Ordering::SeqCst);
70 ws.stamp_lsn(lsn);
71 self.store.apply(ws)?;
72 let mut active = self.active_txs.lock();
73 active.remove(&tx).ok_or(Error::TxNotFound(tx))?;
74 self.committed_watermark.fetch_max(tx, Ordering::SeqCst);
75 Ok(lsn)
76 }
77
78 pub fn rollback(&self, tx: TxId) -> Result<()> {
79 let mut active = self.active_txs.lock();
80 active.remove(&tx).ok_or(Error::TxNotFound(tx))?;
81 Ok(())
82 }
83
84 pub fn store(&self) -> &S {
85 &self.store
86 }
87
88 pub fn new_row_id(&self) -> RowId {
89 self.store.new_row_id()
90 }
91
92 pub fn current_lsn(&self) -> u64 {
93 self.next_lsn.load(Ordering::SeqCst).saturating_sub(1)
94 }
95
96 pub fn allocate_ddl_lsn<F, R>(&self, f: F) -> R
97 where
98 F: FnOnce(u64) -> R,
99 {
100 let _lock = self.commit_mutex.lock();
101 let lsn = self.next_lsn.fetch_add(1, Ordering::SeqCst);
102 f(lsn)
103 }
104
105 pub fn with_commit_lock<F, R>(&self, f: F) -> R
106 where
107 F: FnOnce() -> R,
108 {
109 let _lock = self.commit_mutex.lock();
110 f()
111 }
112}
113
114impl<S: WriteSetApplicator> contextdb_core::TransactionManager for TxManager<S> {
115 fn begin(&self) -> TxId {
116 TxManager::begin(self)
117 }
118
119 fn commit(&self, tx: TxId) -> Result<()> {
120 TxManager::commit(self, tx)
121 }
122
123 fn rollback(&self, tx: TxId) -> Result<()> {
124 TxManager::rollback(self, tx)
125 }
126
127 fn snapshot(&self) -> SnapshotId {
128 TxManager::snapshot(self)
129 }
130}