use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use crate::event::StreamEvent;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum IsolationLevel {
ReadUncommitted,
ReadCommitted,
RepeatableRead,
Serializable,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum TransactionState {
Preparing,
Prepared,
Committing,
Committed,
Aborting,
Aborted,
Failed { reason: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransactionMetadata {
pub transaction_id: String,
pub start_time: DateTime<Utc>,
pub end_time: Option<DateTime<Utc>>,
pub state: TransactionState,
pub isolation_level: IsolationLevel,
pub participants: Vec<String>,
pub event_count: usize,
pub timeout: Duration,
pub properties: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransactionLogEntry {
pub id: u64,
pub transaction_id: String,
pub entry_type: LogEntryType,
pub timestamp: DateTime<Utc>,
pub events: Vec<StreamEvent>,
pub checksum: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LogEntryType {
Begin,
EventAdded,
Prepare,
Commit,
Abort,
Checkpoint,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransactionCheckpoint {
pub checkpoint_id: String,
pub timestamp: DateTime<Utc>,
pub active_transactions: Vec<String>,
pub last_committed_id: Option<String>,
pub event_offset: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransactionalConfig {
pub enable_exactly_once: bool,
pub default_isolation_level: IsolationLevel,
pub transaction_timeout: Duration,
pub enable_wal: bool,
pub wal_sync_interval: Duration,
pub checkpoint_interval: Duration,
pub max_transaction_size: usize,
pub idempotency_window: Duration,
pub enable_distributed: bool,
pub two_phase_commit_timeout: Duration,
pub enable_background_tasks: bool,
}
impl Default for TransactionalConfig {
fn default() -> Self {
Self {
enable_exactly_once: true,
default_isolation_level: IsolationLevel::ReadCommitted,
transaction_timeout: Duration::from_secs(60),
enable_wal: true,
wal_sync_interval: Duration::from_millis(100),
checkpoint_interval: Duration::from_secs(300),
max_transaction_size: 10000,
idempotency_window: Duration::from_secs(3600),
enable_distributed: false,
two_phase_commit_timeout: Duration::from_secs(30),
enable_background_tasks: true,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct TransactionalStats {
pub transactions_started: u64,
pub transactions_committed: u64,
pub transactions_aborted: u64,
pub events_processed: u64,
pub duplicates_detected: u64,
pub avg_transaction_duration_ms: f64,
pub max_transaction_duration_ms: u64,
pub active_transactions: usize,
pub wal_entries_written: u64,
pub checkpoints_created: u64,
pub two_phase_commit_failures: u64,
}
pub struct TransactionalProcessor {
config: TransactionalConfig,
active_transactions: Arc<DashMap<String, Arc<RwLock<TransactionMetadata>>>>,
transaction_log: Arc<RwLock<Vec<TransactionLogEntry>>>,
processed_events: Arc<DashMap<String, DateTime<Utc>>>,
checkpoints: Arc<RwLock<Vec<TransactionCheckpoint>>>,
stats: Arc<RwLock<TransactionalStats>>,
last_checkpoint: Arc<RwLock<Instant>>,
command_tx: mpsc::UnboundedSender<TransactionCommand>,
shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
_background_task: Option<tokio::task::JoinHandle<()>>,
}
enum TransactionCommand {
Checkpoint,
CleanupExpired,
SyncWal,
Shutdown,
}
impl TransactionalProcessor {
pub fn new(config: TransactionalConfig) -> Self {
let (command_tx, mut command_rx) = mpsc::unbounded_channel();
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
let active_transactions = Arc::new(DashMap::new());
let transaction_log = Arc::new(RwLock::new(Vec::new()));
let processed_events = Arc::new(DashMap::new());
let checkpoints = Arc::new(RwLock::new(Vec::new()));
let stats = Arc::new(RwLock::new(TransactionalStats::default()));
let last_checkpoint = Arc::new(RwLock::new(Instant::now()));
let background_task = if config.enable_background_tasks {
let active_transactions_clone = active_transactions.clone();
let transaction_log_clone = transaction_log.clone();
let checkpoints_clone = checkpoints.clone();
let stats_clone = stats.clone();
let last_checkpoint_clone = last_checkpoint.clone();
let processed_events_clone = processed_events.clone();
let config_clone = config.clone();
Some(tokio::spawn(async move {
let mut checkpoint_interval =
tokio::time::interval(config_clone.checkpoint_interval);
let mut cleanup_interval = tokio::time::interval(config_clone.idempotency_window);
let mut wal_sync_interval = tokio::time::interval(config_clone.wal_sync_interval);
loop {
tokio::select! {
_ = &mut shutdown_rx => {
debug!("Transactional processor background task shutting down");
break;
}
_ = checkpoint_interval.tick() => {
if let Err(e) = Self::create_checkpoint_internal(
&active_transactions_clone,
&transaction_log_clone,
&checkpoints_clone,
&stats_clone,
&last_checkpoint_clone,
).await {
error!("Failed to create checkpoint: {}", e);
}
}
_ = cleanup_interval.tick() => {
Self::cleanup_expired_events(&processed_events_clone, &config_clone).await;
}
_ = wal_sync_interval.tick() => {
debug!("WAL sync triggered");
}
Some(cmd) = command_rx.recv() => {
match cmd {
TransactionCommand::Checkpoint => {
if let Err(e) = Self::create_checkpoint_internal(
&active_transactions_clone,
&transaction_log_clone,
&checkpoints_clone,
&stats_clone,
&last_checkpoint_clone,
).await {
error!("Manual checkpoint failed: {}", e);
}
}
TransactionCommand::CleanupExpired => {
Self::cleanup_expired_events(&processed_events_clone, &config_clone).await;
}
TransactionCommand::SyncWal => {
debug!("Manual WAL sync triggered");
}
TransactionCommand::Shutdown => {
debug!("Shutdown command received");
break;
}
}
}
}
}
}))
} else {
drop(shutdown_rx);
None
};
Self {
config,
active_transactions,
transaction_log,
processed_events,
checkpoints,
stats,
last_checkpoint,
command_tx,
shutdown_tx: Some(shutdown_tx),
_background_task: background_task,
}
}
pub async fn begin_transaction(
&self,
isolation_level: Option<IsolationLevel>,
) -> Result<String> {
let transaction_id = Uuid::new_v4().to_string();
let metadata = TransactionMetadata {
transaction_id: transaction_id.clone(),
start_time: Utc::now(),
end_time: None,
state: TransactionState::Preparing,
isolation_level: isolation_level.unwrap_or(self.config.default_isolation_level),
participants: Vec::new(),
event_count: 0,
timeout: self.config.transaction_timeout,
properties: HashMap::new(),
};
self.active_transactions
.insert(transaction_id.clone(), Arc::new(RwLock::new(metadata)));
if self.config.enable_wal {
self.write_wal_entry(LogEntryType::Begin, &transaction_id, Vec::new())
.await?;
}
let mut stats = self.stats.write();
stats.transactions_started += 1;
stats.active_transactions = self.active_transactions.len();
info!("Transaction {} started", transaction_id);
Ok(transaction_id)
}
pub async fn add_events(&self, transaction_id: &str, events: Vec<StreamEvent>) -> Result<()> {
let tx = self
.active_transactions
.get(transaction_id)
.ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
{
let mut metadata = tx.write();
if metadata.state != TransactionState::Preparing {
return Err(anyhow!(
"Cannot add events to transaction in state: {:?}",
metadata.state
));
}
if metadata.event_count + events.len() > self.config.max_transaction_size {
return Err(anyhow!("Transaction size limit exceeded"));
}
if self.config.enable_exactly_once {
for event in &events {
let event_id = self.get_event_id(event);
if self.processed_events.contains_key(&event_id) {
warn!("Duplicate event detected: {}", event_id);
self.stats.write().duplicates_detected += 1;
continue;
}
}
}
metadata.event_count += events.len();
}
if self.config.enable_wal {
self.write_wal_entry(LogEntryType::EventAdded, transaction_id, events.clone())
.await?;
}
debug!(
"Added {} events to transaction {}",
events.len(),
transaction_id
);
Ok(())
}
pub async fn prepare_transaction(&self, transaction_id: &str) -> Result<bool> {
let tx = self
.active_transactions
.get(transaction_id)
.ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
{
let mut metadata = tx.write();
metadata.state = TransactionState::Prepared;
}
if self.config.enable_wal {
self.write_wal_entry(LogEntryType::Prepare, transaction_id, Vec::new())
.await?;
}
info!("Transaction {} prepared", transaction_id);
Ok(true)
}
pub async fn commit_transaction(&self, transaction_id: &str) -> Result<()> {
let tx = self
.active_transactions
.get(transaction_id)
.ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
let start_time = {
let metadata = tx.read();
metadata.start_time
};
{
let mut metadata = tx.write();
if metadata.state != TransactionState::Prepared
&& metadata.state != TransactionState::Preparing
{
return Err(anyhow!(
"Cannot commit transaction in state: {:?}",
metadata.state
));
}
metadata.state = TransactionState::Committing;
}
if self.config.enable_wal {
self.write_wal_entry(LogEntryType::Commit, transaction_id, Vec::new())
.await?;
}
{
let mut metadata = tx.write();
metadata.state = TransactionState::Committed;
metadata.end_time = Some(Utc::now());
}
let duration = Utc::now()
.signed_duration_since(start_time)
.num_milliseconds() as u64;
drop(tx);
let mut stats = self.stats.write();
stats.transactions_committed += 1;
stats.max_transaction_duration_ms = stats.max_transaction_duration_ms.max(duration);
stats.avg_transaction_duration_ms =
(stats.avg_transaction_duration_ms + duration as f64) / 2.0;
self.active_transactions.remove(transaction_id);
stats.active_transactions = self.active_transactions.len();
#[cfg(not(test))]
info!("Transaction {} committed in {}ms", transaction_id, duration);
Ok(())
}
pub async fn abort_transaction(&self, transaction_id: &str) -> Result<()> {
let tx = self
.active_transactions
.get(transaction_id)
.ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
{
let mut metadata = tx.write();
metadata.state = TransactionState::Aborting;
}
if self.config.enable_wal {
self.write_wal_entry(LogEntryType::Abort, transaction_id, Vec::new())
.await?;
}
{
let mut metadata = tx.write();
metadata.state = TransactionState::Aborted;
metadata.end_time = Some(Utc::now());
}
drop(tx);
let mut stats = self.stats.write();
stats.transactions_aborted += 1;
self.active_transactions.remove(transaction_id);
stats.active_transactions = self.active_transactions.len();
#[cfg(not(test))]
info!("Transaction {} aborted", transaction_id);
Ok(())
}
pub fn is_event_processed(&self, event: &StreamEvent) -> bool {
let event_id = self.get_event_id(event);
self.processed_events.contains_key(&event_id)
}
pub fn mark_event_processed(&self, event: &StreamEvent) {
let event_id = self.get_event_id(event);
self.processed_events.insert(event_id, Utc::now());
}
fn get_event_id(&self, event: &StreamEvent) -> String {
let metadata = match event {
StreamEvent::TripleAdded { metadata, .. }
| StreamEvent::TripleRemoved { metadata, .. }
| StreamEvent::QuadAdded { metadata, .. }
| StreamEvent::QuadRemoved { metadata, .. }
| StreamEvent::GraphCreated { metadata, .. }
| StreamEvent::GraphCleared { metadata, .. }
| StreamEvent::GraphDeleted { metadata, .. }
| StreamEvent::GraphMetadataUpdated { metadata, .. }
| StreamEvent::GraphPermissionsChanged { metadata, .. }
| StreamEvent::GraphStatisticsUpdated { metadata, .. }
| StreamEvent::GraphRenamed { metadata, .. }
| StreamEvent::GraphMerged { metadata, .. }
| StreamEvent::GraphSplit { metadata, .. }
| StreamEvent::SparqlUpdate { metadata, .. }
| StreamEvent::QueryCompleted { metadata, .. }
| StreamEvent::QueryResultAdded { metadata, .. }
| StreamEvent::QueryResultRemoved { metadata, .. }
| StreamEvent::TransactionBegin { metadata, .. }
| StreamEvent::TransactionCommit { metadata, .. }
| StreamEvent::TransactionAbort { metadata, .. }
| StreamEvent::SchemaChanged { metadata, .. }
| StreamEvent::SchemaDefinitionAdded { metadata, .. }
| StreamEvent::SchemaDefinitionRemoved { metadata, .. }
| StreamEvent::SchemaDefinitionModified { metadata, .. }
| StreamEvent::OntologyImported { metadata, .. }
| StreamEvent::OntologyRemoved { metadata, .. }
| StreamEvent::ConstraintAdded { metadata, .. }
| StreamEvent::ConstraintRemoved { metadata, .. }
| StreamEvent::ConstraintViolated { metadata, .. }
| StreamEvent::IndexCreated { metadata, .. }
| StreamEvent::IndexDropped { metadata, .. }
| StreamEvent::IndexRebuilt { metadata, .. }
| StreamEvent::SchemaUpdated { metadata, .. }
| StreamEvent::ShapeAdded { metadata, .. }
| StreamEvent::ShapeRemoved { metadata, .. }
| StreamEvent::ShapeModified { metadata, .. }
| StreamEvent::ShapeUpdated { metadata, .. }
| StreamEvent::ShapeValidationStarted { metadata, .. }
| StreamEvent::ShapeValidationCompleted { metadata, .. }
| StreamEvent::ShapeViolationDetected { metadata, .. }
| StreamEvent::Heartbeat { metadata, .. }
| StreamEvent::ErrorOccurred { metadata, .. } => metadata,
};
metadata.event_id.clone()
}
async fn write_wal_entry(
&self,
entry_type: LogEntryType,
transaction_id: &str,
events: Vec<StreamEvent>,
) -> Result<()> {
let mut log = self.transaction_log.write();
let entry = TransactionLogEntry {
id: log.len() as u64,
transaction_id: transaction_id.to_string(),
entry_type,
timestamp: Utc::now(),
events,
checksum: self.compute_checksum(transaction_id),
};
log.push(entry);
let mut stats = self.stats.write();
stats.wal_entries_written += 1;
Ok(())
}
fn compute_checksum(&self, data: &str) -> String {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(data.as_bytes());
hex::encode(hasher.finalize())
}
pub async fn create_checkpoint(&self) -> Result<String> {
let _ = self.command_tx.send(TransactionCommand::Checkpoint);
Ok("Checkpoint scheduled".to_string())
}
async fn create_checkpoint_internal(
active_transactions: &Arc<DashMap<String, Arc<RwLock<TransactionMetadata>>>>,
transaction_log: &Arc<RwLock<Vec<TransactionLogEntry>>>,
checkpoints: &Arc<RwLock<Vec<TransactionCheckpoint>>>,
stats: &Arc<RwLock<TransactionalStats>>,
last_checkpoint: &Arc<RwLock<Instant>>,
) -> Result<()> {
let checkpoint_id = Uuid::new_v4().to_string();
let active_tx_ids: Vec<String> = active_transactions
.iter()
.map(|entry| entry.key().clone())
.collect();
let event_offset = transaction_log.read().len() as u64;
let checkpoint = TransactionCheckpoint {
checkpoint_id: checkpoint_id.clone(),
timestamp: Utc::now(),
active_transactions: active_tx_ids,
last_committed_id: None,
event_offset,
};
checkpoints.write().push(checkpoint);
*last_checkpoint.write() = Instant::now();
let mut stats_guard = stats.write();
stats_guard.checkpoints_created += 1;
info!(
"Checkpoint {} created at offset {}",
checkpoint_id, event_offset
);
Ok(())
}
async fn cleanup_expired_events(
processed_events: &Arc<DashMap<String, DateTime<Utc>>>,
config: &TransactionalConfig,
) {
let cutoff = Utc::now()
- chrono::Duration::from_std(config.idempotency_window)
.expect("idempotency_window should be valid chrono Duration");
processed_events.retain(|_, timestamp| *timestamp > cutoff);
debug!(
"Cleaned up expired events, {} remaining",
processed_events.len()
);
}
pub fn get_transaction_status(&self, transaction_id: &str) -> Option<TransactionState> {
self.active_transactions
.get(transaction_id)
.map(|tx| tx.read().state.clone())
}
pub fn get_stats(&self) -> TransactionalStats {
let mut stats = self.stats.read().clone();
stats.active_transactions = self.active_transactions.len();
stats
}
pub async fn recover_from_checkpoint(&self, checkpoint_id: &str) -> Result<()> {
let checkpoints = self.checkpoints.read();
let checkpoint = checkpoints
.iter()
.find(|cp| cp.checkpoint_id == checkpoint_id)
.ok_or_else(|| anyhow!("Checkpoint not found: {}", checkpoint_id))?;
for tx_id in &checkpoint.active_transactions {
info!("Recovering transaction: {}", tx_id);
}
info!(
"Recovered from checkpoint {} at offset {}",
checkpoint_id, checkpoint.event_offset
);
Ok(())
}
pub async fn shutdown(&mut self) -> Result<()> {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
if let Some(task) = self._background_task.take() {
let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
}
info!("Transactional processor shut down");
Ok(())
}
}
impl Drop for TransactionalProcessor {
fn drop(&mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::EventMetadata;
#[tokio::test]
async fn test_transaction_lifecycle() {
let config = TransactionalConfig {
enable_background_tasks: false,
enable_wal: false,
..Default::default()
};
let processor = TransactionalProcessor::new(config);
let tx_id = processor
.begin_transaction(Some(IsolationLevel::ReadCommitted))
.await
.unwrap();
assert!(processor.active_transactions.contains_key(&tx_id));
let event = StreamEvent::SchemaChanged {
schema_type: crate::event::SchemaType::Ontology,
change_type: crate::event::SchemaChangeType::Added,
details: "test schema change".to_string(),
metadata: EventMetadata {
event_id: Uuid::new_v4().to_string(),
timestamp: Utc::now(),
source: "test".to_string(),
user: None,
context: None,
caused_by: None,
version: "1.0".to_string(),
properties: HashMap::new(),
checksum: None,
},
};
processor
.add_events(&tx_id, vec![event.clone()])
.await
.unwrap();
assert!(processor.prepare_transaction(&tx_id).await.unwrap());
processor.commit_transaction(&tx_id).await.unwrap();
assert!(!processor.active_transactions.contains_key(&tx_id));
let stats = processor.get_stats();
assert_eq!(stats.transactions_started, 1);
assert_eq!(stats.transactions_committed, 1);
}
#[tokio::test]
async fn test_transaction_abort() {
let config = TransactionalConfig {
enable_background_tasks: false,
enable_wal: false,
..Default::default()
};
let processor = TransactionalProcessor::new(config);
let tx_id = processor.begin_transaction(None).await.unwrap();
processor.abort_transaction(&tx_id).await.unwrap();
assert!(!processor.active_transactions.contains_key(&tx_id));
let stats = processor.get_stats();
assert_eq!(stats.transactions_started, 1);
assert_eq!(stats.transactions_aborted, 1);
}
#[tokio::test]
async fn test_minimal() {
let config = TransactionalConfig {
enable_background_tasks: false,
enable_wal: false,
..Default::default()
};
let _processor = TransactionalProcessor::new(config);
}
#[tokio::test]
async fn test_begin_only() {
let config = TransactionalConfig {
enable_background_tasks: false,
enable_wal: false,
..Default::default()
};
let processor = TransactionalProcessor::new(config);
let _tx_id = processor.begin_transaction(None).await.unwrap();
}
#[tokio::test]
async fn test_begin_prepare_only() {
let config = TransactionalConfig {
enable_background_tasks: false,
enable_wal: false,
..Default::default()
};
let processor = TransactionalProcessor::new(config);
let tx_id = processor.begin_transaction(None).await.unwrap();
processor.prepare_transaction(&tx_id).await.unwrap();
}
#[tokio::test]
async fn test_begin_prepare_commit() {
let config = TransactionalConfig {
enable_background_tasks: false,
enable_wal: false,
..Default::default()
};
let processor = TransactionalProcessor::new(config);
let tx_id = processor.begin_transaction(None).await.unwrap();
processor.prepare_transaction(&tx_id).await.unwrap();
processor.commit_transaction(&tx_id).await.unwrap();
}
#[tokio::test]
async fn test_idempotency() {
let config = TransactionalConfig {
enable_background_tasks: false,
..Default::default()
};
let processor = TransactionalProcessor::new(config);
let event = StreamEvent::SchemaChanged {
schema_type: crate::event::SchemaType::Ontology,
change_type: crate::event::SchemaChangeType::Added,
details: "test schema change".to_string(),
metadata: EventMetadata {
event_id: "test-event-123".to_string(),
timestamp: Utc::now(),
source: "test".to_string(),
user: None,
context: None,
caused_by: None,
version: "1.0".to_string(),
properties: HashMap::new(),
checksum: None,
},
};
assert!(!processor.is_event_processed(&event));
processor.mark_event_processed(&event);
assert!(processor.is_event_processed(&event));
}
}