dbx_core/transaction/mvcc/
manager.rs1use dashmap::DashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5#[derive(Debug)]
13pub struct TimestampOracle {
14 next_ts: AtomicU64,
16}
17
18impl TimestampOracle {
19 pub fn new(start_ts: u64) -> Self {
21 Self {
22 next_ts: AtomicU64::new(start_ts),
23 }
24 }
25
26 pub fn next(&self) -> u64 {
29 self.next_ts.fetch_add(1, Ordering::SeqCst) + 1
30 }
31
32 pub fn read(&self) -> u64 {
34 self.next_ts.load(Ordering::SeqCst)
35 }
36}
37
38impl Default for TimestampOracle {
39 fn default() -> Self {
40 Self::new(1)
41 }
42}
43
44#[derive(Debug)]
46pub struct TransactionManager {
47 oracle: Arc<TimestampOracle>,
48 active_txs: Arc<DashMap<u64, u64>>,
50}
51
52impl Default for TransactionManager {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl TransactionManager {
59 pub fn new() -> Self {
60 Self {
61 oracle: Arc::new(TimestampOracle::default()),
62 active_txs: Arc::new(DashMap::new()),
63 }
64 }
65
66 pub fn begin_transaction(&self) -> u64 {
68 let read_ts = self.oracle.next();
69 let tx_id = read_ts; self.active_txs.insert(tx_id, read_ts);
71 read_ts
72 }
73
74 pub fn allocate_commit_ts(&self) -> u64 {
76 self.oracle.next()
77 }
78
79 pub fn current_ts(&self) -> u64 {
81 self.oracle.read()
82 }
83
84 pub fn end_transaction(&self, tx_id: u64) {
86 self.active_txs.remove(&tx_id);
87 }
88
89 pub fn min_active_ts(&self) -> Option<u64> {
94 self.active_txs.iter().map(|entry| *entry.value()).min()
95 }
96
97 pub fn active_count(&self) -> usize {
99 self.active_txs.len()
100 }
101}