aurora_db/
transaction.rs

1use crate::error::{AuroraError, Result};
2use dashmap::DashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6static TRANSACTION_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
9pub struct TransactionId(u64);
10
11impl TransactionId {
12    pub fn new() -> Self {
13        Self(TRANSACTION_ID_COUNTER.fetch_add(1, Ordering::SeqCst))
14    }
15
16    pub fn from_u64(id: u64) -> Self {
17        Self(id)
18    }
19
20    pub fn as_u64(&self) -> u64 {
21        self.0
22    }
23}
24
25#[derive(Debug, Clone)]
26pub struct TransactionBuffer {
27    pub id: TransactionId,
28    pub writes: DashMap<String, Vec<u8>>,
29    pub deletes: DashMap<String, ()>,
30}
31
32impl TransactionBuffer {
33    pub fn new(id: TransactionId) -> Self {
34        Self {
35            id,
36            writes: DashMap::new(),
37            deletes: DashMap::new(),
38        }
39    }
40
41    pub fn write(&self, key: String, value: Vec<u8>) {
42        self.deletes.remove(&key);
43        self.writes.insert(key, value);
44    }
45
46    pub fn delete(&self, key: String) {
47        self.writes.remove(&key);
48        self.deletes.insert(key, ());
49    }
50
51    pub fn is_empty(&self) -> bool {
52        self.writes.is_empty() && self.deletes.is_empty()
53    }
54}
55
56pub struct TransactionManager {
57    pub active_transactions: Arc<DashMap<TransactionId, Arc<TransactionBuffer>>>,
58}
59
60impl TransactionManager {
61    pub fn new() -> Self {
62        Self {
63            active_transactions: Arc::new(DashMap::new()),
64        }
65    }
66
67    pub fn begin(&self) -> Arc<TransactionBuffer> {
68        let tx_id = TransactionId::new();
69        let buffer = Arc::new(TransactionBuffer::new(tx_id));
70        self.active_transactions.insert(tx_id, Arc::clone(&buffer));
71        buffer
72    }
73
74    pub fn commit(&self, tx_id: TransactionId) -> Result<()> {
75        if !self.active_transactions.contains_key(&tx_id) {
76            return Err(AuroraError::InvalidOperation(
77                "Transaction not found or already committed".into(),
78            ));
79        }
80
81        self.active_transactions.remove(&tx_id);
82        Ok(())
83    }
84
85    pub fn rollback(&self, tx_id: TransactionId) -> Result<()> {
86        if !self.active_transactions.contains_key(&tx_id) {
87            return Err(AuroraError::InvalidOperation(
88                "Transaction not found or already rolled back".into(),
89            ));
90        }
91
92        self.active_transactions.remove(&tx_id);
93        Ok(())
94    }
95
96    pub fn is_active(&self, tx_id: TransactionId) -> bool {
97        self.active_transactions.contains_key(&tx_id)
98    }
99
100    pub fn active_count(&self) -> usize {
101        self.active_transactions.len()
102    }
103}
104
105impl Clone for TransactionManager {
106    fn clone(&self) -> Self {
107        Self {
108            active_transactions: Arc::clone(&self.active_transactions),
109        }
110    }
111}
112
113impl Default for TransactionManager {
114    fn default() -> Self {
115        Self::new()
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122
123    #[test]
124    fn test_transaction_isolation() {
125        let manager = TransactionManager::new();
126
127        let tx1 = manager.begin();
128        let tx2 = manager.begin();
129
130        assert_ne!(tx1.id, tx2.id);
131        assert_eq!(manager.active_count(), 2);
132
133        tx1.write("key1".to_string(), b"value1".to_vec());
134        tx2.write("key1".to_string(), b"value2".to_vec());
135
136        assert_eq!(tx1.writes.get("key1").unwrap().as_slice(), b"value1");
137        assert_eq!(tx2.writes.get("key1").unwrap().as_slice(), b"value2");
138
139        manager.commit(tx1.id).unwrap();
140        assert_eq!(manager.active_count(), 1);
141
142        manager.rollback(tx2.id).unwrap();
143        assert_eq!(manager.active_count(), 0);
144    }
145}