1use anyhow::{anyhow, Result};
25use chrono::{DateTime, Utc};
26use dashmap::DashMap;
27use parking_lot::RwLock;
28use serde::{Deserialize, Serialize};
29use std::collections::HashMap;
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32use tokio::sync::mpsc;
33use tracing::{debug, error, info, warn};
34use uuid::Uuid;
35
36use crate::event::StreamEvent;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
40pub enum IsolationLevel {
41 ReadUncommitted,
43 ReadCommitted,
45 RepeatableRead,
47 Serializable,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
53pub enum TransactionState {
54 Preparing,
56 Prepared,
58 Committing,
60 Committed,
62 Aborting,
64 Aborted,
66 Failed { reason: String },
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct TransactionMetadata {
73 pub transaction_id: String,
75 pub start_time: DateTime<Utc>,
77 pub end_time: Option<DateTime<Utc>>,
79 pub state: TransactionState,
81 pub isolation_level: IsolationLevel,
83 pub participants: Vec<String>,
85 pub event_count: usize,
87 pub timeout: Duration,
89 pub properties: HashMap<String, String>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct TransactionLogEntry {
96 pub id: u64,
98 pub transaction_id: String,
100 pub entry_type: LogEntryType,
102 pub timestamp: DateTime<Utc>,
104 pub events: Vec<StreamEvent>,
106 pub checksum: String,
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub enum LogEntryType {
113 Begin,
115 EventAdded,
117 Prepare,
119 Commit,
121 Abort,
123 Checkpoint,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct TransactionCheckpoint {
130 pub checkpoint_id: String,
132 pub timestamp: DateTime<Utc>,
134 pub active_transactions: Vec<String>,
136 pub last_committed_id: Option<String>,
138 pub event_offset: u64,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct TransactionalConfig {
145 pub enable_exactly_once: bool,
147 pub default_isolation_level: IsolationLevel,
149 pub transaction_timeout: Duration,
151 pub enable_wal: bool,
153 pub wal_sync_interval: Duration,
155 pub checkpoint_interval: Duration,
157 pub max_transaction_size: usize,
159 pub idempotency_window: Duration,
161 pub enable_distributed: bool,
163 pub two_phase_commit_timeout: Duration,
165 pub enable_background_tasks: bool,
167}
168
169impl Default for TransactionalConfig {
170 fn default() -> Self {
171 Self {
172 enable_exactly_once: true,
173 default_isolation_level: IsolationLevel::ReadCommitted,
174 transaction_timeout: Duration::from_secs(60),
175 enable_wal: true,
176 wal_sync_interval: Duration::from_millis(100),
177 checkpoint_interval: Duration::from_secs(300),
178 max_transaction_size: 10000,
179 idempotency_window: Duration::from_secs(3600),
180 enable_distributed: false,
181 two_phase_commit_timeout: Duration::from_secs(30),
182 enable_background_tasks: true,
183 }
184 }
185}
186
187#[derive(Debug, Clone, Default)]
189pub struct TransactionalStats {
190 pub transactions_started: u64,
192 pub transactions_committed: u64,
194 pub transactions_aborted: u64,
196 pub events_processed: u64,
198 pub duplicates_detected: u64,
200 pub avg_transaction_duration_ms: f64,
202 pub max_transaction_duration_ms: u64,
204 pub active_transactions: usize,
206 pub wal_entries_written: u64,
208 pub checkpoints_created: u64,
210 pub two_phase_commit_failures: u64,
212}
213
214pub struct TransactionalProcessor {
216 config: TransactionalConfig,
218 active_transactions: Arc<DashMap<String, Arc<RwLock<TransactionMetadata>>>>,
220 transaction_log: Arc<RwLock<Vec<TransactionLogEntry>>>,
222 processed_events: Arc<DashMap<String, DateTime<Utc>>>,
224 checkpoints: Arc<RwLock<Vec<TransactionCheckpoint>>>,
226 stats: Arc<RwLock<TransactionalStats>>,
228 last_checkpoint: Arc<RwLock<Instant>>,
230 command_tx: mpsc::UnboundedSender<TransactionCommand>,
232 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
234 _background_task: Option<tokio::task::JoinHandle<()>>,
236}
237
238enum TransactionCommand {
240 Checkpoint,
241 CleanupExpired,
242 SyncWal,
243 Shutdown,
244}
245
246impl TransactionalProcessor {
247 pub fn new(config: TransactionalConfig) -> Self {
249 let (command_tx, mut command_rx) = mpsc::unbounded_channel();
250 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
251
252 let active_transactions = Arc::new(DashMap::new());
253 let transaction_log = Arc::new(RwLock::new(Vec::new()));
254 let processed_events = Arc::new(DashMap::new());
255 let checkpoints = Arc::new(RwLock::new(Vec::new()));
256 let stats = Arc::new(RwLock::new(TransactionalStats::default()));
257 let last_checkpoint = Arc::new(RwLock::new(Instant::now()));
258
259 let background_task = if config.enable_background_tasks {
261 let active_transactions_clone = active_transactions.clone();
263 let transaction_log_clone = transaction_log.clone();
264 let checkpoints_clone = checkpoints.clone();
265 let stats_clone = stats.clone();
266 let last_checkpoint_clone = last_checkpoint.clone();
267 let processed_events_clone = processed_events.clone();
268 let config_clone = config.clone();
269
270 Some(tokio::spawn(async move {
271 let mut checkpoint_interval =
272 tokio::time::interval(config_clone.checkpoint_interval);
273 let mut cleanup_interval = tokio::time::interval(config_clone.idempotency_window);
274 let mut wal_sync_interval = tokio::time::interval(config_clone.wal_sync_interval);
275
276 loop {
277 tokio::select! {
278 _ = &mut shutdown_rx => {
279 debug!("Transactional processor background task shutting down");
280 break;
281 }
282 _ = checkpoint_interval.tick() => {
283 if let Err(e) = Self::create_checkpoint_internal(
284 &active_transactions_clone,
285 &transaction_log_clone,
286 &checkpoints_clone,
287 &stats_clone,
288 &last_checkpoint_clone,
289 ).await {
290 error!("Failed to create checkpoint: {}", e);
291 }
292 }
293 _ = cleanup_interval.tick() => {
294 Self::cleanup_expired_events(&processed_events_clone, &config_clone).await;
295 }
296 _ = wal_sync_interval.tick() => {
297 debug!("WAL sync triggered");
299 }
300 Some(cmd) = command_rx.recv() => {
301 match cmd {
302 TransactionCommand::Checkpoint => {
303 if let Err(e) = Self::create_checkpoint_internal(
304 &active_transactions_clone,
305 &transaction_log_clone,
306 &checkpoints_clone,
307 &stats_clone,
308 &last_checkpoint_clone,
309 ).await {
310 error!("Manual checkpoint failed: {}", e);
311 }
312 }
313 TransactionCommand::CleanupExpired => {
314 Self::cleanup_expired_events(&processed_events_clone, &config_clone).await;
315 }
316 TransactionCommand::SyncWal => {
317 debug!("Manual WAL sync triggered");
318 }
319 TransactionCommand::Shutdown => {
320 debug!("Shutdown command received");
321 break;
322 }
323 }
324 }
325 }
326 }
327 }))
328 } else {
329 drop(shutdown_rx);
332 None
333 };
334
335 Self {
336 config,
337 active_transactions,
338 transaction_log,
339 processed_events,
340 checkpoints,
341 stats,
342 last_checkpoint,
343 command_tx,
344 shutdown_tx: Some(shutdown_tx),
345 _background_task: background_task,
346 }
347 }
348
349 pub async fn begin_transaction(
351 &self,
352 isolation_level: Option<IsolationLevel>,
353 ) -> Result<String> {
354 let transaction_id = Uuid::new_v4().to_string();
355
356 let metadata = TransactionMetadata {
357 transaction_id: transaction_id.clone(),
358 start_time: Utc::now(),
359 end_time: None,
360 state: TransactionState::Preparing,
361 isolation_level: isolation_level.unwrap_or(self.config.default_isolation_level),
362 participants: Vec::new(),
363 event_count: 0,
364 timeout: self.config.transaction_timeout,
365 properties: HashMap::new(),
366 };
367
368 self.active_transactions
369 .insert(transaction_id.clone(), Arc::new(RwLock::new(metadata)));
370
371 if self.config.enable_wal {
373 self.write_wal_entry(LogEntryType::Begin, &transaction_id, Vec::new())
374 .await?;
375 }
376
377 let mut stats = self.stats.write();
379 stats.transactions_started += 1;
380 stats.active_transactions = self.active_transactions.len();
381
382 info!("Transaction {} started", transaction_id);
383 Ok(transaction_id)
384 }
385
386 pub async fn add_events(&self, transaction_id: &str, events: Vec<StreamEvent>) -> Result<()> {
388 let tx = self
389 .active_transactions
390 .get(transaction_id)
391 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
392
393 {
395 let mut metadata = tx.write();
396
397 if metadata.state != TransactionState::Preparing {
399 return Err(anyhow!(
400 "Cannot add events to transaction in state: {:?}",
401 metadata.state
402 ));
403 }
404
405 if metadata.event_count + events.len() > self.config.max_transaction_size {
407 return Err(anyhow!("Transaction size limit exceeded"));
408 }
409
410 if self.config.enable_exactly_once {
412 for event in &events {
413 let event_id = self.get_event_id(event);
414 if self.processed_events.contains_key(&event_id) {
415 warn!("Duplicate event detected: {}", event_id);
416 self.stats.write().duplicates_detected += 1;
417 continue;
418 }
419 }
420 }
421
422 metadata.event_count += events.len();
423 } if self.config.enable_wal {
427 self.write_wal_entry(LogEntryType::EventAdded, transaction_id, events.clone())
428 .await?;
429 }
430
431 debug!(
432 "Added {} events to transaction {}",
433 events.len(),
434 transaction_id
435 );
436 Ok(())
437 }
438
439 pub async fn prepare_transaction(&self, transaction_id: &str) -> Result<bool> {
441 let tx = self
442 .active_transactions
443 .get(transaction_id)
444 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
445
446 {
448 let mut metadata = tx.write();
449
450 metadata.state = TransactionState::Prepared;
452 } if self.config.enable_wal {
456 self.write_wal_entry(LogEntryType::Prepare, transaction_id, Vec::new())
457 .await?;
458 }
459
460 info!("Transaction {} prepared", transaction_id);
461 Ok(true)
462 }
463
464 pub async fn commit_transaction(&self, transaction_id: &str) -> Result<()> {
466 let tx = self
467 .active_transactions
468 .get(transaction_id)
469 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
470
471 let start_time = {
472 let metadata = tx.read();
473 metadata.start_time
474 };
475
476 {
477 let mut metadata = tx.write();
478
479 if metadata.state != TransactionState::Prepared
481 && metadata.state != TransactionState::Preparing
482 {
483 return Err(anyhow!(
484 "Cannot commit transaction in state: {:?}",
485 metadata.state
486 ));
487 }
488
489 metadata.state = TransactionState::Committing;
490 }
491
492 if self.config.enable_wal {
494 self.write_wal_entry(LogEntryType::Commit, transaction_id, Vec::new())
495 .await?;
496 }
497
498 {
500 let mut metadata = tx.write();
501 metadata.state = TransactionState::Committed;
502 metadata.end_time = Some(Utc::now());
503 }
504
505 let duration = Utc::now()
507 .signed_duration_since(start_time)
508 .num_milliseconds() as u64;
509
510 drop(tx);
512
513 let mut stats = self.stats.write();
514 stats.transactions_committed += 1;
515 stats.max_transaction_duration_ms = stats.max_transaction_duration_ms.max(duration);
516 stats.avg_transaction_duration_ms =
517 (stats.avg_transaction_duration_ms + duration as f64) / 2.0;
518
519 self.active_transactions.remove(transaction_id);
521 stats.active_transactions = self.active_transactions.len();
522
523 #[cfg(not(test))]
525 info!("Transaction {} committed in {}ms", transaction_id, duration);
526 Ok(())
527 }
528
529 pub async fn abort_transaction(&self, transaction_id: &str) -> Result<()> {
531 let tx = self
532 .active_transactions
533 .get(transaction_id)
534 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
535
536 {
537 let mut metadata = tx.write();
538 metadata.state = TransactionState::Aborting;
539 }
540
541 if self.config.enable_wal {
543 self.write_wal_entry(LogEntryType::Abort, transaction_id, Vec::new())
544 .await?;
545 }
546
547 {
549 let mut metadata = tx.write();
550 metadata.state = TransactionState::Aborted;
551 metadata.end_time = Some(Utc::now());
552 }
553
554 drop(tx);
556
557 let mut stats = self.stats.write();
559 stats.transactions_aborted += 1;
560
561 self.active_transactions.remove(transaction_id);
563 stats.active_transactions = self.active_transactions.len();
564
565 #[cfg(not(test))]
566 info!("Transaction {} aborted", transaction_id);
567 Ok(())
568 }
569
570 pub fn is_event_processed(&self, event: &StreamEvent) -> bool {
572 let event_id = self.get_event_id(event);
573 self.processed_events.contains_key(&event_id)
574 }
575
576 pub fn mark_event_processed(&self, event: &StreamEvent) {
578 let event_id = self.get_event_id(event);
579 self.processed_events.insert(event_id, Utc::now());
580 }
581
582 fn get_event_id(&self, event: &StreamEvent) -> String {
584 let metadata = match event {
586 StreamEvent::TripleAdded { metadata, .. }
587 | StreamEvent::TripleRemoved { metadata, .. }
588 | StreamEvent::QuadAdded { metadata, .. }
589 | StreamEvent::QuadRemoved { metadata, .. }
590 | StreamEvent::GraphCreated { metadata, .. }
591 | StreamEvent::GraphCleared { metadata, .. }
592 | StreamEvent::GraphDeleted { metadata, .. }
593 | StreamEvent::GraphMetadataUpdated { metadata, .. }
594 | StreamEvent::GraphPermissionsChanged { metadata, .. }
595 | StreamEvent::GraphStatisticsUpdated { metadata, .. }
596 | StreamEvent::GraphRenamed { metadata, .. }
597 | StreamEvent::GraphMerged { metadata, .. }
598 | StreamEvent::GraphSplit { metadata, .. }
599 | StreamEvent::SparqlUpdate { metadata, .. }
600 | StreamEvent::QueryCompleted { metadata, .. }
601 | StreamEvent::QueryResultAdded { metadata, .. }
602 | StreamEvent::QueryResultRemoved { metadata, .. }
603 | StreamEvent::TransactionBegin { metadata, .. }
604 | StreamEvent::TransactionCommit { metadata, .. }
605 | StreamEvent::TransactionAbort { metadata, .. }
606 | StreamEvent::SchemaChanged { metadata, .. }
607 | StreamEvent::SchemaDefinitionAdded { metadata, .. }
608 | StreamEvent::SchemaDefinitionRemoved { metadata, .. }
609 | StreamEvent::SchemaDefinitionModified { metadata, .. }
610 | StreamEvent::OntologyImported { metadata, .. }
611 | StreamEvent::OntologyRemoved { metadata, .. }
612 | StreamEvent::ConstraintAdded { metadata, .. }
613 | StreamEvent::ConstraintRemoved { metadata, .. }
614 | StreamEvent::ConstraintViolated { metadata, .. }
615 | StreamEvent::IndexCreated { metadata, .. }
616 | StreamEvent::IndexDropped { metadata, .. }
617 | StreamEvent::IndexRebuilt { metadata, .. }
618 | StreamEvent::SchemaUpdated { metadata, .. }
619 | StreamEvent::ShapeAdded { metadata, .. }
620 | StreamEvent::ShapeRemoved { metadata, .. }
621 | StreamEvent::ShapeModified { metadata, .. }
622 | StreamEvent::ShapeUpdated { metadata, .. }
623 | StreamEvent::ShapeValidationStarted { metadata, .. }
624 | StreamEvent::ShapeValidationCompleted { metadata, .. }
625 | StreamEvent::ShapeViolationDetected { metadata, .. }
626 | StreamEvent::Heartbeat { metadata, .. }
627 | StreamEvent::ErrorOccurred { metadata, .. } => metadata,
628 };
629 metadata.event_id.clone()
630 }
631
632 async fn write_wal_entry(
634 &self,
635 entry_type: LogEntryType,
636 transaction_id: &str,
637 events: Vec<StreamEvent>,
638 ) -> Result<()> {
639 let mut log = self.transaction_log.write();
640
641 let entry = TransactionLogEntry {
642 id: log.len() as u64,
643 transaction_id: transaction_id.to_string(),
644 entry_type,
645 timestamp: Utc::now(),
646 events,
647 checksum: self.compute_checksum(transaction_id),
648 };
649
650 log.push(entry);
651
652 let mut stats = self.stats.write();
653 stats.wal_entries_written += 1;
654
655 Ok(())
656 }
657
658 fn compute_checksum(&self, data: &str) -> String {
660 use sha2::{Digest, Sha256};
662 let mut hasher = Sha256::new();
663 hasher.update(data.as_bytes());
664 hex::encode(hasher.finalize())
665 }
666
667 pub async fn create_checkpoint(&self) -> Result<String> {
669 let _ = self.command_tx.send(TransactionCommand::Checkpoint);
670 Ok("Checkpoint scheduled".to_string())
671 }
672
673 async fn create_checkpoint_internal(
675 active_transactions: &Arc<DashMap<String, Arc<RwLock<TransactionMetadata>>>>,
676 transaction_log: &Arc<RwLock<Vec<TransactionLogEntry>>>,
677 checkpoints: &Arc<RwLock<Vec<TransactionCheckpoint>>>,
678 stats: &Arc<RwLock<TransactionalStats>>,
679 last_checkpoint: &Arc<RwLock<Instant>>,
680 ) -> Result<()> {
681 let checkpoint_id = Uuid::new_v4().to_string();
682
683 let active_tx_ids: Vec<String> = active_transactions
684 .iter()
685 .map(|entry| entry.key().clone())
686 .collect();
687
688 let event_offset = transaction_log.read().len() as u64;
689
690 let checkpoint = TransactionCheckpoint {
691 checkpoint_id: checkpoint_id.clone(),
692 timestamp: Utc::now(),
693 active_transactions: active_tx_ids,
694 last_committed_id: None,
695 event_offset,
696 };
697
698 checkpoints.write().push(checkpoint);
699 *last_checkpoint.write() = Instant::now();
700
701 let mut stats_guard = stats.write();
702 stats_guard.checkpoints_created += 1;
703
704 info!(
705 "Checkpoint {} created at offset {}",
706 checkpoint_id, event_offset
707 );
708 Ok(())
709 }
710
711 async fn cleanup_expired_events(
713 processed_events: &Arc<DashMap<String, DateTime<Utc>>>,
714 config: &TransactionalConfig,
715 ) {
716 let cutoff = Utc::now() - chrono::Duration::from_std(config.idempotency_window).unwrap();
717
718 processed_events.retain(|_, timestamp| *timestamp > cutoff);
719
720 debug!(
721 "Cleaned up expired events, {} remaining",
722 processed_events.len()
723 );
724 }
725
726 pub fn get_transaction_status(&self, transaction_id: &str) -> Option<TransactionState> {
728 self.active_transactions
729 .get(transaction_id)
730 .map(|tx| tx.read().state.clone())
731 }
732
733 pub fn get_stats(&self) -> TransactionalStats {
735 let mut stats = self.stats.read().clone();
736 stats.active_transactions = self.active_transactions.len();
737 stats
738 }
739
740 pub async fn recover_from_checkpoint(&self, checkpoint_id: &str) -> Result<()> {
742 let checkpoints = self.checkpoints.read();
743
744 let checkpoint = checkpoints
745 .iter()
746 .find(|cp| cp.checkpoint_id == checkpoint_id)
747 .ok_or_else(|| anyhow!("Checkpoint not found: {}", checkpoint_id))?;
748
749 for tx_id in &checkpoint.active_transactions {
751 info!("Recovering transaction: {}", tx_id);
752 }
754
755 info!(
756 "Recovered from checkpoint {} at offset {}",
757 checkpoint_id, checkpoint.event_offset
758 );
759 Ok(())
760 }
761
762 pub async fn shutdown(&mut self) -> Result<()> {
764 if let Some(shutdown_tx) = self.shutdown_tx.take() {
766 let _ = shutdown_tx.send(());
767 }
768
769 if let Some(task) = self._background_task.take() {
771 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
772 }
773
774 info!("Transactional processor shut down");
775 Ok(())
776 }
777}
778
779impl Drop for TransactionalProcessor {
780 fn drop(&mut self) {
781 if let Some(shutdown_tx) = self.shutdown_tx.take() {
783 let _ = shutdown_tx.send(());
784 }
785 }
786}
787
788#[cfg(test)]
789mod tests {
790 use super::*;
791 use crate::event::EventMetadata;
792
793 #[tokio::test]
794 async fn test_transaction_lifecycle() {
795 let config = TransactionalConfig {
797 enable_background_tasks: false,
798 enable_wal: false,
799 ..Default::default()
800 };
801 let processor = TransactionalProcessor::new(config);
802
803 let tx_id = processor
805 .begin_transaction(Some(IsolationLevel::ReadCommitted))
806 .await
807 .unwrap();
808
809 assert!(processor.active_transactions.contains_key(&tx_id));
810
811 let event = StreamEvent::SchemaChanged {
813 schema_type: crate::event::SchemaType::Ontology,
814 change_type: crate::event::SchemaChangeType::Added,
815 details: "test schema change".to_string(),
816 metadata: EventMetadata {
817 event_id: Uuid::new_v4().to_string(),
818 timestamp: Utc::now(),
819 source: "test".to_string(),
820 user: None,
821 context: None,
822 caused_by: None,
823 version: "1.0".to_string(),
824 properties: HashMap::new(),
825 checksum: None,
826 },
827 };
828
829 processor
830 .add_events(&tx_id, vec![event.clone()])
831 .await
832 .unwrap();
833
834 assert!(processor.prepare_transaction(&tx_id).await.unwrap());
836
837 processor.commit_transaction(&tx_id).await.unwrap();
839
840 assert!(!processor.active_transactions.contains_key(&tx_id));
842
843 let stats = processor.get_stats();
844 assert_eq!(stats.transactions_started, 1);
845 assert_eq!(stats.transactions_committed, 1);
846 }
847
848 #[tokio::test]
849 async fn test_transaction_abort() {
850 let config = TransactionalConfig {
852 enable_background_tasks: false,
853 enable_wal: false,
854 ..Default::default()
855 };
856 let processor = TransactionalProcessor::new(config);
857
858 let tx_id = processor.begin_transaction(None).await.unwrap();
859
860 processor.abort_transaction(&tx_id).await.unwrap();
861
862 assert!(!processor.active_transactions.contains_key(&tx_id));
863
864 let stats = processor.get_stats();
865 assert_eq!(stats.transactions_started, 1);
866 assert_eq!(stats.transactions_aborted, 1);
867 }
868
869 #[tokio::test]
870 async fn test_minimal() {
871 let config = TransactionalConfig {
872 enable_background_tasks: false,
873 enable_wal: false,
874 ..Default::default()
875 };
876 let _processor = TransactionalProcessor::new(config);
877 }
879
880 #[tokio::test]
881 async fn test_begin_only() {
882 let config = TransactionalConfig {
883 enable_background_tasks: false,
884 enable_wal: false,
885 ..Default::default()
886 };
887 let processor = TransactionalProcessor::new(config);
888
889 let _tx_id = processor.begin_transaction(None).await.unwrap();
891 }
893
894 #[tokio::test]
895 async fn test_begin_prepare_only() {
896 let config = TransactionalConfig {
897 enable_background_tasks: false,
898 enable_wal: false,
899 ..Default::default()
900 };
901 let processor = TransactionalProcessor::new(config);
902
903 let tx_id = processor.begin_transaction(None).await.unwrap();
904 processor.prepare_transaction(&tx_id).await.unwrap();
905 }
907
908 #[tokio::test]
909 async fn test_begin_prepare_commit() {
910 let config = TransactionalConfig {
911 enable_background_tasks: false,
912 enable_wal: false,
913 ..Default::default()
914 };
915 let processor = TransactionalProcessor::new(config);
916
917 let tx_id = processor.begin_transaction(None).await.unwrap();
918 processor.prepare_transaction(&tx_id).await.unwrap();
919 processor.commit_transaction(&tx_id).await.unwrap();
920 }
922
923 #[tokio::test]
924 async fn test_idempotency() {
925 let config = TransactionalConfig {
927 enable_background_tasks: false,
928 ..Default::default()
929 };
930 let processor = TransactionalProcessor::new(config);
931
932 let event = StreamEvent::SchemaChanged {
933 schema_type: crate::event::SchemaType::Ontology,
934 change_type: crate::event::SchemaChangeType::Added,
935 details: "test schema change".to_string(),
936 metadata: EventMetadata {
937 event_id: "test-event-123".to_string(),
938 timestamp: Utc::now(),
939 source: "test".to_string(),
940 user: None,
941 context: None,
942 caused_by: None,
943 version: "1.0".to_string(),
944 properties: HashMap::new(),
945 checksum: None,
946 },
947 };
948
949 assert!(!processor.is_event_processed(&event));
950
951 processor.mark_event_processed(&event);
952
953 assert!(processor.is_event_processed(&event));
954 }
955}