use crate::RuleAtom;
use anyhow::Result;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use tracing::{debug, info, warn};
pub type TransactionId = u64;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsolationLevel {
ReadUncommitted,
ReadCommitted,
RepeatableRead,
Serializable,
}
#[derive(Debug, Clone, PartialEq)]
pub enum TransactionState {
Active,
Committing,
Committed,
RollingBack,
Aborted,
}
#[derive(Debug, Clone)]
pub enum Operation {
AddFact(RuleAtom),
RemoveFact(RuleAtom),
AddFacts(Vec<RuleAtom>),
RemoveFacts(Vec<RuleAtom>),
}
#[derive(Debug, Clone)]
pub struct Transaction {
pub id: TransactionId,
pub isolation_level: IsolationLevel,
pub state: TransactionState,
pub operations: Vec<Operation>,
pub start_time: u64,
pub end_time: Option<u64>,
}
pub struct TransactionManager {
transactions: Arc<Mutex<HashMap<TransactionId, Transaction>>>,
next_id: Arc<Mutex<TransactionId>>,
committed_facts: Arc<Mutex<Vec<RuleAtom>>>,
log: Arc<Mutex<VecDeque<LogEntry>>>,
timestamp: Arc<Mutex<u64>>,
max_log_size: usize,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct LogEntry {
transaction_id: TransactionId,
operation: Operation,
timestamp: u64,
}
impl Default for TransactionManager {
fn default() -> Self {
Self::new()
}
}
impl TransactionManager {
pub fn new() -> Self {
Self {
transactions: Arc::new(Mutex::new(HashMap::new())),
next_id: Arc::new(Mutex::new(0)),
committed_facts: Arc::new(Mutex::new(Vec::new())),
log: Arc::new(Mutex::new(VecDeque::new())),
timestamp: Arc::new(Mutex::new(0)),
max_log_size: 10000,
}
}
pub fn begin_transaction(&self, isolation_level: IsolationLevel) -> Result<TransactionId> {
let mut next_id = self.next_id.lock().expect("lock poisoned");
let tx_id = *next_id;
*next_id += 1;
let mut timestamp = self.timestamp.lock().expect("lock poisoned");
*timestamp += 1;
let start_time = *timestamp;
let transaction = Transaction {
id: tx_id,
isolation_level,
state: TransactionState::Active,
operations: Vec::new(),
start_time,
end_time: None,
};
let mut transactions = self.transactions.lock().expect("lock poisoned");
transactions.insert(tx_id, transaction);
info!(
"Started transaction {} with isolation level {:?}",
tx_id, isolation_level
);
Ok(tx_id)
}
pub fn add_fact(&self, tx_id: TransactionId, fact: RuleAtom) -> Result<()> {
let mut transactions = self.transactions.lock().expect("lock poisoned");
let transaction = transactions
.get_mut(&tx_id)
.ok_or_else(|| anyhow::anyhow!("Transaction {} not found", tx_id))?;
if transaction.state != TransactionState::Active {
return Err(anyhow::anyhow!(
"Transaction {} is not active (state: {:?})",
tx_id,
transaction.state
));
}
transaction
.operations
.push(Operation::AddFact(fact.clone()));
debug!("Added fact to transaction {}", tx_id);
Ok(())
}
pub fn remove_fact(&self, tx_id: TransactionId, fact: RuleAtom) -> Result<()> {
let mut transactions = self.transactions.lock().expect("lock poisoned");
let transaction = transactions
.get_mut(&tx_id)
.ok_or_else(|| anyhow::anyhow!("Transaction {} not found", tx_id))?;
if transaction.state != TransactionState::Active {
return Err(anyhow::anyhow!("Transaction {} is not active", tx_id));
}
transaction.operations.push(Operation::RemoveFact(fact));
debug!("Removed fact in transaction {}", tx_id);
Ok(())
}
pub fn add_facts(&self, tx_id: TransactionId, facts: Vec<RuleAtom>) -> Result<()> {
let mut transactions = self.transactions.lock().expect("lock poisoned");
let transaction = transactions
.get_mut(&tx_id)
.ok_or_else(|| anyhow::anyhow!("Transaction {} not found", tx_id))?;
if transaction.state != TransactionState::Active {
return Err(anyhow::anyhow!("Transaction {} is not active", tx_id));
}
transaction
.operations
.push(Operation::AddFacts(facts.clone()));
debug!("Added {} facts to transaction {}", facts.len(), tx_id);
Ok(())
}
pub fn commit(&self, tx_id: TransactionId) -> Result<()> {
let mut transactions = self.transactions.lock().expect("lock poisoned");
let transaction = transactions
.get_mut(&tx_id)
.ok_or_else(|| anyhow::anyhow!("Transaction {} not found", tx_id))?;
if transaction.state != TransactionState::Active {
return Err(anyhow::anyhow!(
"Cannot commit transaction {} in state {:?}",
tx_id,
transaction.state
));
}
transaction.state = TransactionState::Committing;
let mut committed_facts = self.committed_facts.lock().expect("lock poisoned");
for operation in &transaction.operations {
match operation {
Operation::AddFact(fact) => {
committed_facts.push(fact.clone());
self.log_operation(tx_id, operation.clone())?;
}
Operation::RemoveFact(fact) => {
committed_facts.retain(|f| f != fact);
self.log_operation(tx_id, operation.clone())?;
}
Operation::AddFacts(facts) => {
committed_facts.extend(facts.clone());
self.log_operation(tx_id, operation.clone())?;
}
Operation::RemoveFacts(facts) => {
for fact in facts {
committed_facts.retain(|f| f != fact);
}
self.log_operation(tx_id, operation.clone())?;
}
}
}
let mut timestamp = self.timestamp.lock().expect("lock poisoned");
*timestamp += 1;
transaction.end_time = Some(*timestamp);
transaction.state = TransactionState::Committed;
info!("Committed transaction {}", tx_id);
Ok(())
}
pub fn rollback(&self, tx_id: TransactionId) -> Result<()> {
let mut transactions = self.transactions.lock().expect("lock poisoned");
let transaction = transactions
.get_mut(&tx_id)
.ok_or_else(|| anyhow::anyhow!("Transaction {} not found", tx_id))?;
if transaction.state != TransactionState::Active {
return Err(anyhow::anyhow!(
"Cannot rollback transaction {} in state {:?}",
tx_id,
transaction.state
));
}
transaction.state = TransactionState::RollingBack;
transaction.operations.clear();
let mut timestamp = self.timestamp.lock().expect("lock poisoned");
*timestamp += 1;
transaction.end_time = Some(*timestamp);
transaction.state = TransactionState::Aborted;
warn!("Rolled back transaction {}", tx_id);
Ok(())
}
pub fn get_committed_facts(&self) -> Vec<RuleAtom> {
let committed_facts = self.committed_facts.lock().expect("lock poisoned");
committed_facts.clone()
}
pub fn get_transaction_state(&self, tx_id: TransactionId) -> Option<TransactionState> {
let transactions = self.transactions.lock().expect("lock poisoned");
transactions.get(&tx_id).map(|tx| tx.state.clone())
}
pub fn is_active(&self, tx_id: TransactionId) -> bool {
matches!(
self.get_transaction_state(tx_id),
Some(TransactionState::Active)
)
}
fn log_operation(&self, tx_id: TransactionId, operation: Operation) -> Result<()> {
let mut log = self.log.lock().expect("lock poisoned");
let mut timestamp = self.timestamp.lock().expect("lock poisoned");
*timestamp += 1;
let entry = LogEntry {
transaction_id: tx_id,
operation,
timestamp: *timestamp,
};
log.push_back(entry);
while log.len() > self.max_log_size {
log.pop_front();
}
Ok(())
}
pub fn get_stats(&self) -> TransactionStats {
let transactions = self.transactions.lock().expect("lock poisoned");
let committed_facts = self.committed_facts.lock().expect("lock poisoned");
let log = self.log.lock().expect("lock poisoned");
let active_count = transactions
.values()
.filter(|tx| tx.state == TransactionState::Active)
.count();
let committed_count = transactions
.values()
.filter(|tx| tx.state == TransactionState::Committed)
.count();
let aborted_count = transactions
.values()
.filter(|tx| tx.state == TransactionState::Aborted)
.count();
TransactionStats {
total_transactions: transactions.len(),
active_transactions: active_count,
committed_transactions: committed_count,
aborted_transactions: aborted_count,
committed_facts: committed_facts.len(),
log_entries: log.len(),
}
}
pub fn gc_completed_transactions(&self) {
let mut transactions = self.transactions.lock().expect("lock poisoned");
transactions.retain(|_, tx| {
tx.state == TransactionState::Active || tx.state == TransactionState::Committing
});
debug!("Garbage collected completed transactions");
}
}
#[derive(Debug, Clone)]
pub struct TransactionStats {
pub total_transactions: usize,
pub active_transactions: usize,
pub committed_transactions: usize,
pub aborted_transactions: usize,
pub committed_facts: usize,
pub log_entries: usize,
}
impl std::fmt::Display for TransactionStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Total: {}, Active: {}, Committed: {}, Aborted: {}, Facts: {}, Log: {}",
self.total_transactions,
self.active_transactions,
self.committed_transactions,
self.aborted_transactions,
self.committed_facts,
self.log_entries
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Term;
#[test]
fn test_begin_transaction() -> Result<(), Box<dyn std::error::Error>> {
let manager = TransactionManager::new();
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
assert_eq!(tx_id, 0);
assert!(manager.is_active(tx_id));
Ok(())
}
#[test]
fn test_add_fact() -> Result<(), Box<dyn std::error::Error>> {
let manager = TransactionManager::new();
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
let fact = RuleAtom::Triple {
subject: Term::Constant("john".to_string()),
predicate: Term::Constant("age".to_string()),
object: Term::Literal("30".to_string()),
};
manager.add_fact(tx_id, fact)?;
assert!(manager.is_active(tx_id));
Ok(())
}
#[test]
fn test_commit() -> Result<(), Box<dyn std::error::Error>> {
let manager = TransactionManager::new();
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
let fact = RuleAtom::Triple {
subject: Term::Constant("john".to_string()),
predicate: Term::Constant("age".to_string()),
object: Term::Literal("30".to_string()),
};
manager.add_fact(tx_id, fact)?;
manager.commit(tx_id)?;
let committed_facts = manager.get_committed_facts();
assert_eq!(committed_facts.len(), 1);
assert_eq!(
manager.get_transaction_state(tx_id),
Some(TransactionState::Committed)
);
Ok(())
}
#[test]
fn test_rollback() -> Result<(), Box<dyn std::error::Error>> {
let manager = TransactionManager::new();
let tx_id = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
let fact = RuleAtom::Triple {
subject: Term::Constant("john".to_string()),
predicate: Term::Constant("age".to_string()),
object: Term::Literal("30".to_string()),
};
manager.add_fact(tx_id, fact)?;
manager.rollback(tx_id)?;
let committed_facts = manager.get_committed_facts();
assert_eq!(committed_facts.len(), 0);
assert_eq!(
manager.get_transaction_state(tx_id),
Some(TransactionState::Aborted)
);
Ok(())
}
#[test]
fn test_multiple_transactions() -> Result<(), Box<dyn std::error::Error>> {
let manager = TransactionManager::new();
let tx1 = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
let tx2 = manager.begin_transaction(IsolationLevel::ReadCommitted)?;
assert_ne!(tx1, tx2);
assert!(manager.is_active(tx1));
assert!(manager.is_active(tx2));
Ok(())
}
#[test]
fn test_stats() -> Result<(), Box<dyn std::error::Error>> {
let manager = TransactionManager::new();
manager.begin_transaction(IsolationLevel::ReadCommitted)?;
let stats = manager.get_stats();
assert_eq!(stats.active_transactions, 1);
assert_eq!(stats.total_transactions, 1);
Ok(())
}
}