oxirs_core/transaction/
mod.rs1pub mod acid_transaction;
34pub mod named_graph;
35pub mod snapshot;
36pub mod wal;
37
38pub use acid_transaction::{AcidTransaction, TransactionId, TransactionState};
39pub use named_graph::{GraphStats, NamedGraphTransaction};
40pub use snapshot::{MvccSnapshot, SnapshotManager, VersionedQuad};
41pub use wal::{WalEntry, WalRecovery, WalValidation, WriteAheadLog};
42
43use crate::model::{GraphName, Quad};
44use crate::OxirsError;
45use ahash::AHashSet;
46use scirs2_core::metrics::{Counter, Timer};
47use std::path::Path;
48use std::sync::atomic::{AtomicU64, Ordering};
49use std::sync::{Arc, RwLock};
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum IsolationLevel {
54 ReadUncommitted,
56 ReadCommitted,
58 RepeatableRead,
60 Snapshot,
62 Serializable,
64}
65
66pub struct TransactionManager {
68 next_tx_id: Arc<AtomicU64>,
70 wal: Arc<RwLock<WriteAheadLog>>,
72 snapshot_mgr: Arc<RwLock<SnapshotManager>>,
74 active_transactions: Arc<RwLock<Vec<TransactionId>>>,
76 graph_locks: Arc<RwLock<AHashSet<GraphName>>>,
78 commit_counter: Arc<Counter>,
80 abort_counter: Arc<Counter>,
81 commit_timer: Arc<Timer>,
82}
83
84impl TransactionManager {
85 pub fn new(wal_dir: impl AsRef<Path>) -> Result<Self, OxirsError> {
87 let wal = WriteAheadLog::new(wal_dir)?;
88
89 Ok(Self {
90 next_tx_id: Arc::new(AtomicU64::new(1)),
91 wal: Arc::new(RwLock::new(wal)),
92 snapshot_mgr: Arc::new(RwLock::new(SnapshotManager::new())),
93 active_transactions: Arc::new(RwLock::new(Vec::new())),
94 graph_locks: Arc::new(RwLock::new(AHashSet::new())),
95 commit_counter: Arc::new(Counter::new("transaction.commits".to_string())),
96 abort_counter: Arc::new(Counter::new("transaction.aborts".to_string())),
97 commit_timer: Arc::new(Timer::new("transaction.commit_time".to_string())),
98 })
99 }
100
101 pub fn begin(&mut self, isolation: IsolationLevel) -> Result<AcidTransaction, OxirsError> {
103 let tx_id = TransactionId(self.next_tx_id.fetch_add(1, Ordering::SeqCst));
104
105 let snapshot = match isolation {
107 IsolationLevel::Snapshot | IsolationLevel::Serializable => {
108 let mut snapshot_mgr = self
109 .snapshot_mgr
110 .write()
111 .map_err(|_| OxirsError::ConcurrencyError("Lock poisoned".to_string()))?;
112 Some(snapshot_mgr.create_snapshot(tx_id))
113 }
114 _ => None,
115 };
116
117 let mut active = self
119 .active_transactions
120 .write()
121 .map_err(|_| OxirsError::ConcurrencyError("Lock poisoned".to_string()))?;
122 active.push(tx_id);
123
124 Ok(AcidTransaction::new(
125 tx_id,
126 isolation,
127 snapshot,
128 self.wal.clone(),
129 self.commit_counter.clone(),
130 self.abort_counter.clone(),
131 self.commit_timer.clone(),
132 ))
133 }
134
135 pub fn begin_named_graph_transaction(
142 &mut self,
143 isolation: IsolationLevel,
144 ) -> Result<NamedGraphTransaction, OxirsError> {
145 let inner = self.begin(isolation)?;
146 Ok(NamedGraphTransaction::new(inner, self.graph_locks.clone()))
147 }
148
149 pub fn recover<F, G>(&mut self, insert_fn: F, delete_fn: G) -> Result<usize, OxirsError>
171 where
172 F: FnMut(Quad) -> Result<bool, OxirsError>,
173 G: FnMut(&Quad) -> Result<bool, OxirsError>,
174 {
175 let wal_guard = self
177 .wal
178 .read()
179 .map_err(|_| OxirsError::ConcurrencyError("Lock poisoned".to_string()))?;
180
181 let wal_path = wal_guard.path().to_path_buf();
183 drop(wal_guard);
184
185 let recovery = WalRecovery::new(&wal_path)?;
186 recovery.replay(insert_fn, delete_fn)
187 }
188
189 pub fn checkpoint(&mut self) -> Result<(), OxirsError> {
191 let mut wal = self
192 .wal
193 .write()
194 .map_err(|_| OxirsError::ConcurrencyError("Lock poisoned".to_string()))?;
195
196 wal.checkpoint()
197 }
198
199 pub fn stats(&self) -> TransactionStats {
201 TransactionStats {
202 total_commits: self.commit_counter.get(),
203 total_aborts: self.abort_counter.get(),
204 active_count: self
205 .active_transactions
206 .read()
207 .ok()
208 .map(|a| a.len())
209 .unwrap_or(0),
210 avg_commit_time_ms: {
211 let timer_stats = self.commit_timer.get_stats();
212 timer_stats.mean * 1000.0
213 },
214 }
215 }
216}
217
218#[derive(Debug, Clone)]
220pub struct TransactionStats {
221 pub total_commits: u64,
223 pub total_aborts: u64,
225 pub active_count: usize,
227 pub avg_commit_time_ms: f64,
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use tempfile::tempdir;
235
236 #[test]
237 fn test_transaction_manager_creation() -> Result<(), OxirsError> {
238 let dir = tempdir().map_err(|e| OxirsError::Io(e.to_string()))?;
239 let tx_mgr = TransactionManager::new(dir.path())?;
240
241 let stats = tx_mgr.stats();
242 assert_eq!(stats.total_commits, 0);
243 assert_eq!(stats.total_aborts, 0);
244
245 Ok(())
246 }
247
248 #[test]
249 fn test_begin_transaction() -> Result<(), OxirsError> {
250 let dir = tempdir().map_err(|e| OxirsError::Io(e.to_string()))?;
251 let mut tx_mgr = TransactionManager::new(dir.path())?;
252
253 let tx = tx_mgr.begin(IsolationLevel::Snapshot)?;
254 assert_eq!(tx.id().0, 1);
255
256 Ok(())
257 }
258}