use super::{IsolationLevel, MvccSnapshot, WalEntry, WriteAheadLog};
use crate::model::Quad;
use crate::OxirsError;
use scirs2_core::metrics::{Counter, Timer};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct TransactionId(pub u64);
impl TransactionId {
pub fn raw(&self) -> u64 {
self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransactionState {
Active,
Preparing,
Committed,
Aborted,
}
pub struct AcidTransaction {
id: TransactionId,
state: TransactionState,
isolation: IsolationLevel,
#[allow(dead_code)]
snapshot: Option<MvccSnapshot>,
pending_inserts: Vec<Quad>,
pending_deletes: Vec<Quad>,
read_set: HashMap<Quad, u64>,
write_set: HashMap<Quad, QuadVersion>,
wal: Arc<RwLock<WriteAheadLog>>,
commit_counter: Arc<Counter>,
abort_counter: Arc<Counter>,
commit_timer: Arc<Timer>,
}
#[derive(Debug, Clone)]
struct QuadVersion {
#[allow(dead_code)]
quad: Quad,
version: u64,
#[allow(dead_code)]
created_by: TransactionId,
deleted_by: Option<TransactionId>,
}
impl AcidTransaction {
pub(super) fn new(
id: TransactionId,
isolation: IsolationLevel,
snapshot: Option<MvccSnapshot>,
wal: Arc<RwLock<WriteAheadLog>>,
commit_counter: Arc<Counter>,
abort_counter: Arc<Counter>,
commit_timer: Arc<Timer>,
) -> Self {
Self {
id,
state: TransactionState::Active,
isolation,
snapshot,
pending_inserts: Vec::new(),
pending_deletes: Vec::new(),
read_set: HashMap::new(),
write_set: HashMap::new(),
wal,
commit_counter,
abort_counter,
commit_timer,
}
}
pub fn id(&self) -> TransactionId {
self.id
}
pub fn state(&self) -> TransactionState {
self.state
}
pub fn isolation(&self) -> IsolationLevel {
self.isolation
}
pub fn insert(&mut self, quad: Quad) -> Result<bool, OxirsError> {
self.check_active()?;
if self.pending_inserts.contains(&quad) {
return Ok(false);
}
if let Some(pos) = self.pending_deletes.iter().position(|q| q == &quad) {
self.pending_deletes.remove(pos);
return Ok(true);
}
self.write_set.insert(
quad.clone(),
QuadVersion {
quad: quad.clone(),
version: self.id.0,
created_by: self.id,
deleted_by: None,
},
);
self.pending_inserts.push(quad.clone());
self.write_to_wal(WalEntry::Insert {
tx_id: self.id.0,
quad,
})?;
Ok(true)
}
pub fn delete(&mut self, quad: Quad) -> Result<bool, OxirsError> {
self.check_active()?;
if self.pending_deletes.contains(&quad) {
return Ok(false);
}
if let Some(pos) = self.pending_inserts.iter().position(|q| q == &quad) {
self.pending_inserts.remove(pos);
return Ok(true);
}
if let Some(version) = self.write_set.get_mut(&quad) {
version.deleted_by = Some(self.id);
} else {
self.write_set.insert(
quad.clone(),
QuadVersion {
quad: quad.clone(),
version: self.id.0,
created_by: self.id,
deleted_by: Some(self.id),
},
);
}
self.pending_deletes.push(quad.clone());
self.write_to_wal(WalEntry::Delete {
tx_id: self.id.0,
quad,
})?;
Ok(true)
}
pub fn record_read(&mut self, quad: &Quad) -> Result<(), OxirsError> {
self.check_active()?;
self.read_set.insert(quad.clone(), self.id.0);
Ok(())
}
fn validate(&self) -> Result<(), OxirsError> {
if self.isolation == IsolationLevel::Serializable {
for (quad, version) in &self.read_set {
if let Some(write_version) = self.write_set.get(quad) {
if write_version.version > *version {
return Err(OxirsError::ConcurrencyError(
"Read-write conflict detected".to_string(),
));
}
}
}
}
let mut seen_inserts = std::collections::HashSet::new();
for quad in &self.pending_inserts {
if !seen_inserts.insert(quad) {
return Err(OxirsError::Store(format!(
"Duplicate insert detected in transaction: {:?}",
quad
)));
}
}
for insert_quad in &self.pending_inserts {
for delete_quad in &self.pending_deletes {
if insert_quad == delete_quad {
return Err(OxirsError::Store(format!(
"Conflicting insert/delete operations for quad: {:?}",
insert_quad
)));
}
}
}
let mut write_conflicts = 0;
for (quad, version) in &self.write_set {
if version.deleted_by.is_some() && version.created_by == self.id {
write_conflicts += 1;
}
if version.version > self.id.0 {
return Err(OxirsError::ConcurrencyError(format!(
"Version inconsistency detected for quad: {:?}",
quad
)));
}
}
const MAX_PENDING_OPS: usize = 1_000_000; let total_ops = self.pending_inserts.len() + self.pending_deletes.len();
if total_ops > MAX_PENDING_OPS {
return Err(OxirsError::Store(format!(
"Transaction exceeds maximum operation limit: {} > {}",
total_ops, MAX_PENDING_OPS
)));
}
for quad in &self.pending_inserts {
use crate::model::RdfTerm;
if quad.predicate().as_str().is_empty() {
return Err(OxirsError::Store(
"Invalid predicate in quad: predicate cannot be empty".to_string(),
));
}
}
if self.isolation == IsolationLevel::Snapshot {
if !self.read_set.is_empty() && write_conflicts > self.pending_deletes.len() / 2 {
return Err(OxirsError::ConcurrencyError(
"Snapshot isolation violation: too many write conflicts".to_string(),
));
}
}
for delete_quad in &self.pending_deletes {
if self.pending_inserts.contains(delete_quad) {
return Err(OxirsError::Store(format!(
"Cannot delete quad that was just inserted: {:?}",
delete_quad
)));
}
}
if self.read_set.len() > 1000 && self.write_set.len() > 1000 {
tracing::warn!(
"Transaction {} has large read/write sets - potential deadlock risk",
self.id.0
);
}
tracing::debug!(
"Transaction {} validated: {} inserts, {} deletes, {} reads, {} writes",
self.id.0,
self.pending_inserts.len(),
self.pending_deletes.len(),
self.read_set.len(),
self.write_set.len()
);
Ok(())
}
pub fn commit(mut self) -> Result<(), OxirsError> {
self.check_active()?;
let _timer_guard = self.commit_timer.start();
self.state = TransactionState::Preparing;
self.validate()?;
self.write_to_wal(WalEntry::Commit { tx_id: self.id.0 })?;
self.flush_wal()?;
self.state = TransactionState::Committed;
self.commit_counter.add(1);
tracing::info!(
"Transaction {} committed successfully with {} inserts and {} deletes",
self.id.0,
self.pending_inserts.len(),
self.pending_deletes.len()
);
Ok(())
}
pub fn abort(mut self) -> Result<(), OxirsError> {
if self.state == TransactionState::Committed {
return Err(OxirsError::Store(
"Cannot abort committed transaction".to_string(),
));
}
self.write_to_wal(WalEntry::Abort { tx_id: self.id.0 })?;
self.state = TransactionState::Aborted;
self.abort_counter.add(1);
self.pending_inserts.clear();
self.pending_deletes.clear();
self.read_set.clear();
self.write_set.clear();
tracing::info!("Transaction {} aborted", self.id.0);
Ok(())
}
pub fn pending_inserts(&self) -> &[Quad] {
&self.pending_inserts
}
pub fn pending_deletes(&self) -> &[Quad] {
&self.pending_deletes
}
pub fn operation_count(&self) -> usize {
self.pending_inserts.len() + self.pending_deletes.len()
}
fn check_active(&self) -> Result<(), OxirsError> {
if self.state != TransactionState::Active {
return Err(OxirsError::Store(format!(
"Transaction is not active (state: {:?})",
self.state
)));
}
Ok(())
}
fn write_to_wal(&self, entry: WalEntry) -> Result<(), OxirsError> {
let mut wal = self
.wal
.write()
.map_err(|_| OxirsError::ConcurrencyError("WAL lock poisoned".to_string()))?;
wal.append(entry)
}
fn flush_wal(&self) -> Result<(), OxirsError> {
let mut wal = self
.wal
.write()
.map_err(|_| OxirsError::ConcurrencyError("WAL lock poisoned".to_string()))?;
wal.flush()
}
}
impl Drop for AcidTransaction {
fn drop(&mut self) {
if self.state == TransactionState::Active {
tracing::warn!(
"Transaction {} dropped without commit or abort, auto-aborting",
self.id.0
);
let _ = self.write_to_wal(WalEntry::Abort { tx_id: self.id.0 });
self.abort_counter.add(1);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::{GraphName, Literal, NamedNode, Object, Predicate, Subject};
use tempfile::tempdir;
fn create_test_quad(id: usize) -> Quad {
Quad::new(
Subject::NamedNode(
NamedNode::new(format!("http://s{}", id)).expect("valid IRI from format"),
),
Predicate::NamedNode(
NamedNode::new(format!("http://p{}", id)).expect("valid IRI from format"),
),
Object::Literal(Literal::new(format!("value{}", id))),
GraphName::DefaultGraph,
)
}
#[test]
fn test_transaction_insert() -> Result<(), OxirsError> {
let dir = tempdir().map_err(|e| OxirsError::Io(e.to_string()))?;
let wal = Arc::new(RwLock::new(WriteAheadLog::new(dir.path())?));
let mut tx = AcidTransaction::new(
TransactionId(1),
IsolationLevel::Snapshot,
None,
wal,
Arc::new(Counter::new("test.commits".to_string())),
Arc::new(Counter::new("test.aborts".to_string())),
Arc::new(Timer::new("test.commit_time".to_string())),
);
let quad = create_test_quad(1);
assert!(tx.insert(quad.clone())?);
assert!(!tx.insert(quad)?);
assert_eq!(tx.pending_inserts().len(), 1);
Ok(())
}
#[test]
fn test_transaction_delete() -> Result<(), OxirsError> {
let dir = tempdir().map_err(|e| OxirsError::Io(e.to_string()))?;
let wal = Arc::new(RwLock::new(WriteAheadLog::new(dir.path())?));
let mut tx = AcidTransaction::new(
TransactionId(1),
IsolationLevel::Snapshot,
None,
wal,
Arc::new(Counter::new("test.commits".to_string())),
Arc::new(Counter::new("test.aborts".to_string())),
Arc::new(Timer::new("test.commit_time".to_string())),
);
let quad = create_test_quad(1);
assert!(tx.delete(quad.clone())?);
assert!(!tx.delete(quad)?);
assert_eq!(tx.pending_deletes().len(), 1);
Ok(())
}
#[test]
fn test_transaction_commit() -> Result<(), OxirsError> {
let dir = tempdir().map_err(|e| OxirsError::Io(e.to_string()))?;
let wal = Arc::new(RwLock::new(WriteAheadLog::new(dir.path())?));
let mut tx = AcidTransaction::new(
TransactionId(1),
IsolationLevel::Snapshot,
None,
wal,
Arc::new(Counter::new("test.commits".to_string())),
Arc::new(Counter::new("test.aborts".to_string())),
Arc::new(Timer::new("test.commit_time".to_string())),
);
let quad = create_test_quad(1);
tx.insert(quad)?;
assert_eq!(tx.state(), TransactionState::Active);
tx.commit()?;
Ok(())
}
#[test]
fn test_transaction_abort() -> Result<(), OxirsError> {
let dir = tempdir().map_err(|e| OxirsError::Io(e.to_string()))?;
let wal = Arc::new(RwLock::new(WriteAheadLog::new(dir.path())?));
let mut tx = AcidTransaction::new(
TransactionId(1),
IsolationLevel::Snapshot,
None,
wal,
Arc::new(Counter::new("test.commits".to_string())),
Arc::new(Counter::new("test.aborts".to_string())),
Arc::new(Timer::new("test.commit_time".to_string())),
);
let quad = create_test_quad(1);
tx.insert(quad)?;
assert_eq!(tx.state(), TransactionState::Active);
tx.abort()?;
Ok(())
}
}