pub mod acid_transaction;
pub mod named_graph;
pub mod snapshot;
pub mod wal;
pub use acid_transaction::{AcidTransaction, TransactionId, TransactionState};
pub use named_graph::{GraphStats, NamedGraphTransaction};
pub use snapshot::{MvccSnapshot, SnapshotManager, VersionedQuad};
pub use wal::{WalEntry, WalRecovery, WalValidation, WriteAheadLog};
use crate::model::{GraphName, Quad};
use crate::OxirsError;
use ahash::AHashSet;
use scirs2_core::metrics::{Counter, Timer};
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsolationLevel {
ReadUncommitted,
ReadCommitted,
RepeatableRead,
Snapshot,
Serializable,
}
pub struct TransactionManager {
next_tx_id: Arc<AtomicU64>,
wal: Arc<RwLock<WriteAheadLog>>,
snapshot_mgr: Arc<RwLock<SnapshotManager>>,
active_transactions: Arc<RwLock<Vec<TransactionId>>>,
graph_locks: Arc<RwLock<AHashSet<GraphName>>>,
commit_counter: Arc<Counter>,
abort_counter: Arc<Counter>,
commit_timer: Arc<Timer>,
}
impl TransactionManager {
pub fn new(wal_dir: impl AsRef<Path>) -> Result<Self, OxirsError> {
let wal = WriteAheadLog::new(wal_dir)?;
Ok(Self {
next_tx_id: Arc::new(AtomicU64::new(1)),
wal: Arc::new(RwLock::new(wal)),
snapshot_mgr: Arc::new(RwLock::new(SnapshotManager::new())),
active_transactions: Arc::new(RwLock::new(Vec::new())),
graph_locks: Arc::new(RwLock::new(AHashSet::new())),
commit_counter: Arc::new(Counter::new("transaction.commits".to_string())),
abort_counter: Arc::new(Counter::new("transaction.aborts".to_string())),
commit_timer: Arc::new(Timer::new("transaction.commit_time".to_string())),
})
}
pub fn begin(&mut self, isolation: IsolationLevel) -> Result<AcidTransaction, OxirsError> {
let tx_id = TransactionId(self.next_tx_id.fetch_add(1, Ordering::SeqCst));
let snapshot = match isolation {
IsolationLevel::Snapshot | IsolationLevel::Serializable => {
let mut snapshot_mgr = self
.snapshot_mgr
.write()
.map_err(|_| OxirsError::ConcurrencyError("Lock poisoned".to_string()))?;
Some(snapshot_mgr.create_snapshot(tx_id))
}
_ => None,
};
let mut active = self
.active_transactions
.write()
.map_err(|_| OxirsError::ConcurrencyError("Lock poisoned".to_string()))?;
active.push(tx_id);
Ok(AcidTransaction::new(
tx_id,
isolation,
snapshot,
self.wal.clone(),
self.commit_counter.clone(),
self.abort_counter.clone(),
self.commit_timer.clone(),
))
}
pub fn begin_named_graph_transaction(
&mut self,
isolation: IsolationLevel,
) -> Result<NamedGraphTransaction, OxirsError> {
let inner = self.begin(isolation)?;
Ok(NamedGraphTransaction::new(inner, self.graph_locks.clone()))
}
pub fn recover<F, G>(&mut self, insert_fn: F, delete_fn: G) -> Result<usize, OxirsError>
where
F: FnMut(Quad) -> Result<bool, OxirsError>,
G: FnMut(&Quad) -> Result<bool, OxirsError>,
{
let wal_guard = self
.wal
.read()
.map_err(|_| OxirsError::ConcurrencyError("Lock poisoned".to_string()))?;
let wal_path = wal_guard.path().to_path_buf();
drop(wal_guard);
let recovery = WalRecovery::new(&wal_path)?;
recovery.replay(insert_fn, delete_fn)
}
pub fn checkpoint(&mut self) -> Result<(), OxirsError> {
let mut wal = self
.wal
.write()
.map_err(|_| OxirsError::ConcurrencyError("Lock poisoned".to_string()))?;
wal.checkpoint()
}
pub fn stats(&self) -> TransactionStats {
TransactionStats {
total_commits: self.commit_counter.get(),
total_aborts: self.abort_counter.get(),
active_count: self
.active_transactions
.read()
.ok()
.map(|a| a.len())
.unwrap_or(0),
avg_commit_time_ms: {
let timer_stats = self.commit_timer.get_stats();
timer_stats.mean * 1000.0
},
}
}
}
#[derive(Debug, Clone)]
pub struct TransactionStats {
pub total_commits: u64,
pub total_aborts: u64,
pub active_count: usize,
pub avg_commit_time_ms: f64,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_transaction_manager_creation() -> Result<(), OxirsError> {
let dir = tempdir().map_err(|e| OxirsError::Io(e.to_string()))?;
let tx_mgr = TransactionManager::new(dir.path())?;
let stats = tx_mgr.stats();
assert_eq!(stats.total_commits, 0);
assert_eq!(stats.total_aborts, 0);
Ok(())
}
#[test]
fn test_begin_transaction() -> Result<(), OxirsError> {
let dir = tempdir().map_err(|e| OxirsError::Io(e.to_string()))?;
let mut tx_mgr = TransactionManager::new(dir.path())?;
let tx = tx_mgr.begin(IsolationLevel::Snapshot)?;
assert_eq!(tx.id().0, 1);
Ok(())
}
}