1use crate::error::{AuroraError, Result};
2use dashmap::DashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6static TRANSACTION_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
9pub struct TransactionId(u64);
10
11impl TransactionId {
12 pub fn new() -> Self {
13 Self(TRANSACTION_ID_COUNTER.fetch_add(1, Ordering::SeqCst))
14 }
15
16 pub fn from_u64(id: u64) -> Self {
17 Self(id)
18 }
19
20 pub fn as_u64(&self) -> u64 {
21 self.0
22 }
23}
24
25#[derive(Debug, Clone)]
26pub struct TransactionBuffer {
27 pub id: TransactionId,
28 pub writes: DashMap<String, Vec<u8>>,
29 pub deletes: DashMap<String, ()>,
30}
31
32impl TransactionBuffer {
33 pub fn new(id: TransactionId) -> Self {
34 Self {
35 id,
36 writes: DashMap::new(),
37 deletes: DashMap::new(),
38 }
39 }
40
41 pub fn write(&self, key: String, value: Vec<u8>) {
42 self.deletes.remove(&key);
43 self.writes.insert(key, value);
44 }
45
46 pub fn delete(&self, key: String) {
47 self.writes.remove(&key);
48 self.deletes.insert(key, ());
49 }
50
51 pub fn is_empty(&self) -> bool {
52 self.writes.is_empty() && self.deletes.is_empty()
53 }
54}
55
56pub struct TransactionManager {
57 pub active_transactions: Arc<DashMap<TransactionId, Arc<TransactionBuffer>>>,
58}
59
60impl TransactionManager {
61 pub fn new() -> Self {
62 Self {
63 active_transactions: Arc::new(DashMap::new()),
64 }
65 }
66
67 pub fn begin(&self) -> Arc<TransactionBuffer> {
68 let tx_id = TransactionId::new();
69 let buffer = Arc::new(TransactionBuffer::new(tx_id));
70 self.active_transactions.insert(tx_id, Arc::clone(&buffer));
71 buffer
72 }
73
74 pub fn commit(&self, tx_id: TransactionId) -> Result<()> {
75 if !self.active_transactions.contains_key(&tx_id) {
76 return Err(AuroraError::InvalidOperation(
77 "Transaction not found or already committed".into(),
78 ));
79 }
80
81 self.active_transactions.remove(&tx_id);
82 Ok(())
83 }
84
85 pub fn rollback(&self, tx_id: TransactionId) -> Result<()> {
86 if !self.active_transactions.contains_key(&tx_id) {
87 return Err(AuroraError::InvalidOperation(
88 "Transaction not found or already rolled back".into(),
89 ));
90 }
91
92 self.active_transactions.remove(&tx_id);
93 Ok(())
94 }
95
96 pub fn is_active(&self, tx_id: TransactionId) -> bool {
97 self.active_transactions.contains_key(&tx_id)
98 }
99
100 pub fn active_count(&self) -> usize {
101 self.active_transactions.len()
102 }
103}
104
105impl Clone for TransactionManager {
106 fn clone(&self) -> Self {
107 Self {
108 active_transactions: Arc::clone(&self.active_transactions),
109 }
110 }
111}
112
113impl Default for TransactionManager {
114 fn default() -> Self {
115 Self::new()
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122
123 #[test]
124 fn test_transaction_isolation() {
125 let manager = TransactionManager::new();
126
127 let tx1 = manager.begin();
128 let tx2 = manager.begin();
129
130 assert_ne!(tx1.id, tx2.id);
131 assert_eq!(manager.active_count(), 2);
132
133 tx1.write("key1".to_string(), b"value1".to_vec());
134 tx2.write("key1".to_string(), b"value2".to_vec());
135
136 assert_eq!(tx1.writes.get("key1").unwrap().as_slice(), b"value1");
137 assert_eq!(tx2.writes.get("key1").unwrap().as_slice(), b"value2");
138
139 manager.commit(tx1.id).unwrap();
140 assert_eq!(manager.active_count(), 1);
141
142 manager.rollback(tx2.id).unwrap();
143 assert_eq!(manager.active_count(), 0);
144 }
145}