oxirs_core/transaction/
mod.rs

1//! ACID transaction support with Write-Ahead Logging
2//!
3//! This module provides full ACID (Atomicity, Consistency, Isolation, Durability)
4//! transaction support for RDF operations using:
5//!
6//! - **Atomicity**: All-or-nothing commit semantics
7//! - **Consistency**: Validation before commit
8//! - **Isolation**: Snapshot isolation with MVCC
9//! - **Durability**: Write-Ahead Logging (WAL) for crash recovery
10//!
11//! # Example
12//!
13//! ```rust,no_run
14//! use oxirs_core::transaction::{TransactionManager, IsolationLevel};
15//! use oxirs_core::model::Quad;
16//!
17//! # fn example() -> Result<(), oxirs_core::OxirsError> {
18//! let mut tx_mgr = TransactionManager::new("./wal")?;
19//!
20//! // Begin a transaction with snapshot isolation
21//! let mut tx = tx_mgr.begin(IsolationLevel::Snapshot)?;
22//!
23//! // Perform operations
24//! // tx.insert(quad)?;
25//! // tx.remove(quad)?;
26//!
27//! // Commit with durability guarantee
28//! tx.commit()?;
29//! # Ok(())
30//! # }
31//! ```
32
33pub mod acid_transaction;
34pub mod named_graph;
35pub mod snapshot;
36pub mod wal;
37
38pub use acid_transaction::{AcidTransaction, TransactionId, TransactionState};
39pub use named_graph::{GraphStats, NamedGraphTransaction};
40pub use snapshot::{MvccSnapshot, SnapshotManager, VersionedQuad};
41pub use wal::{WalEntry, WalRecovery, WalValidation, WriteAheadLog};
42
43use crate::model::{GraphName, Quad};
44use crate::OxirsError;
45use ahash::AHashSet;
46use scirs2_core::metrics::{Counter, Timer};
47use std::path::Path;
48use std::sync::atomic::{AtomicU64, Ordering};
49use std::sync::{Arc, RwLock};
50
51/// Isolation level for transactions
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum IsolationLevel {
54    /// Read uncommitted (no isolation)
55    ReadUncommitted,
56    /// Read committed (prevents dirty reads)
57    ReadCommitted,
58    /// Repeatable read (prevents non-repeatable reads)
59    RepeatableRead,
60    /// Snapshot isolation (MVCC-based, prevents all anomalies)
61    Snapshot,
62    /// Serializable (strongest isolation)
63    Serializable,
64}
65
66/// Transaction manager with ACID guarantees
67pub struct TransactionManager {
68    /// Next transaction ID
69    next_tx_id: Arc<AtomicU64>,
70    /// Write-Ahead Log
71    wal: Arc<RwLock<WriteAheadLog>>,
72    /// Snapshot manager for MVCC
73    snapshot_mgr: Arc<RwLock<SnapshotManager>>,
74    /// Active transactions
75    active_transactions: Arc<RwLock<Vec<TransactionId>>>,
76    /// Graph-level locks for named graph transactions
77    graph_locks: Arc<RwLock<AHashSet<GraphName>>>,
78    /// Metrics
79    commit_counter: Arc<Counter>,
80    abort_counter: Arc<Counter>,
81    commit_timer: Arc<Timer>,
82}
83
84impl TransactionManager {
85    /// Create a new transaction manager
86    pub fn new(wal_dir: impl AsRef<Path>) -> Result<Self, OxirsError> {
87        let wal = WriteAheadLog::new(wal_dir)?;
88
89        Ok(Self {
90            next_tx_id: Arc::new(AtomicU64::new(1)),
91            wal: Arc::new(RwLock::new(wal)),
92            snapshot_mgr: Arc::new(RwLock::new(SnapshotManager::new())),
93            active_transactions: Arc::new(RwLock::new(Vec::new())),
94            graph_locks: Arc::new(RwLock::new(AHashSet::new())),
95            commit_counter: Arc::new(Counter::new("transaction.commits".to_string())),
96            abort_counter: Arc::new(Counter::new("transaction.aborts".to_string())),
97            commit_timer: Arc::new(Timer::new("transaction.commit_time".to_string())),
98        })
99    }
100
101    /// Begin a new transaction with specified isolation level
102    pub fn begin(&mut self, isolation: IsolationLevel) -> Result<AcidTransaction, OxirsError> {
103        let tx_id = TransactionId(self.next_tx_id.fetch_add(1, Ordering::SeqCst));
104
105        // Create snapshot for MVCC
106        let snapshot = match isolation {
107            IsolationLevel::Snapshot | IsolationLevel::Serializable => {
108                let mut snapshot_mgr = self
109                    .snapshot_mgr
110                    .write()
111                    .map_err(|_| OxirsError::ConcurrencyError("Lock poisoned".to_string()))?;
112                Some(snapshot_mgr.create_snapshot(tx_id))
113            }
114            _ => None,
115        };
116
117        // Register as active transaction
118        let mut active = self
119            .active_transactions
120            .write()
121            .map_err(|_| OxirsError::ConcurrencyError("Lock poisoned".to_string()))?;
122        active.push(tx_id);
123
124        Ok(AcidTransaction::new(
125            tx_id,
126            isolation,
127            snapshot,
128            self.wal.clone(),
129            self.commit_counter.clone(),
130            self.abort_counter.clone(),
131            self.commit_timer.clone(),
132        ))
133    }
134
135    /// Begin a new named graph transaction with specified isolation level
136    ///
137    /// This provides graph-specific operations with ACID guarantees:
138    /// - Atomic multi-graph operations
139    /// - Graph-level isolation
140    /// - MVCC snapshot isolation per graph
141    pub fn begin_named_graph_transaction(
142        &mut self,
143        isolation: IsolationLevel,
144    ) -> Result<NamedGraphTransaction, OxirsError> {
145        let inner = self.begin(isolation)?;
146        Ok(NamedGraphTransaction::new(inner, self.graph_locks.clone()))
147    }
148
149    /// Recover from WAL after crash
150    ///
151    /// This method replays committed transactions from the Write-Ahead Log
152    /// using callback functions to apply operations to the store.
153    ///
154    /// # Arguments
155    ///
156    /// * `insert_fn` - Callback to insert a quad into the store
157    /// * `delete_fn` - Callback to delete a quad from the store
158    ///
159    /// # Example
160    ///
161    /// ```ignore
162    /// let mut tx_mgr = TransactionManager::new("./wal")?;
163    /// let mut store = RdfStore::new()?;
164    ///
165    /// tx_mgr.recover(
166    ///     |quad| store.insert_quad(quad),
167    ///     |quad| store.remove_quad(quad)
168    /// )?;
169    /// ```
170    pub fn recover<F, G>(&mut self, insert_fn: F, delete_fn: G) -> Result<usize, OxirsError>
171    where
172        F: FnMut(Quad) -> Result<bool, OxirsError>,
173        G: FnMut(&Quad) -> Result<bool, OxirsError>,
174    {
175        // Create a recovery handler from the WAL path
176        let wal_guard = self
177            .wal
178            .read()
179            .map_err(|_| OxirsError::ConcurrencyError("Lock poisoned".to_string()))?;
180
181        // Get the WAL path to create a recovery handler
182        let wal_path = wal_guard.path().to_path_buf();
183        drop(wal_guard);
184
185        let recovery = WalRecovery::new(&wal_path)?;
186        recovery.replay(insert_fn, delete_fn)
187    }
188
189    /// Checkpoint WAL (write buffered entries to disk)
190    pub fn checkpoint(&mut self) -> Result<(), OxirsError> {
191        let mut wal = self
192            .wal
193            .write()
194            .map_err(|_| OxirsError::ConcurrencyError("Lock poisoned".to_string()))?;
195
196        wal.checkpoint()
197    }
198
199    /// Get transaction statistics
200    pub fn stats(&self) -> TransactionStats {
201        TransactionStats {
202            total_commits: self.commit_counter.get(),
203            total_aborts: self.abort_counter.get(),
204            active_count: self
205                .active_transactions
206                .read()
207                .ok()
208                .map(|a| a.len())
209                .unwrap_or(0),
210            avg_commit_time_ms: {
211                let timer_stats = self.commit_timer.get_stats();
212                timer_stats.mean * 1000.0
213            },
214        }
215    }
216}
217
218/// Transaction statistics
219#[derive(Debug, Clone)]
220pub struct TransactionStats {
221    /// Total committed transactions
222    pub total_commits: u64,
223    /// Total aborted transactions
224    pub total_aborts: u64,
225    /// Active transactions count
226    pub active_count: usize,
227    /// Average commit time in milliseconds
228    pub avg_commit_time_ms: f64,
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use tempfile::tempdir;
235
236    #[test]
237    fn test_transaction_manager_creation() -> Result<(), OxirsError> {
238        let dir = tempdir().map_err(|e| OxirsError::Io(e.to_string()))?;
239        let tx_mgr = TransactionManager::new(dir.path())?;
240
241        let stats = tx_mgr.stats();
242        assert_eq!(stats.total_commits, 0);
243        assert_eq!(stats.total_aborts, 0);
244
245        Ok(())
246    }
247
248    #[test]
249    fn test_begin_transaction() -> Result<(), OxirsError> {
250        let dir = tempdir().map_err(|e| OxirsError::Io(e.to_string()))?;
251        let mut tx_mgr = TransactionManager::new(dir.path())?;
252
253        let tx = tx_mgr.begin(IsolationLevel::Snapshot)?;
254        assert_eq!(tx.id().0, 1);
255
256        Ok(())
257    }
258}