1use anyhow::{anyhow, Result};
25use chrono::{DateTime, Utc};
26use dashmap::DashMap;
27use parking_lot::RwLock;
28use serde::{Deserialize, Serialize};
29use std::collections::HashMap;
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32use tokio::sync::mpsc;
33use tracing::{debug, error, info, warn};
34use uuid::Uuid;
35
36use crate::event::StreamEvent;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
40pub enum IsolationLevel {
41 ReadUncommitted,
43 ReadCommitted,
45 RepeatableRead,
47 Serializable,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
53pub enum TransactionState {
54 Preparing,
56 Prepared,
58 Committing,
60 Committed,
62 Aborting,
64 Aborted,
66 Failed { reason: String },
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct TransactionMetadata {
73 pub transaction_id: String,
75 pub start_time: DateTime<Utc>,
77 pub end_time: Option<DateTime<Utc>>,
79 pub state: TransactionState,
81 pub isolation_level: IsolationLevel,
83 pub participants: Vec<String>,
85 pub event_count: usize,
87 pub timeout: Duration,
89 pub properties: HashMap<String, String>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct TransactionLogEntry {
96 pub id: u64,
98 pub transaction_id: String,
100 pub entry_type: LogEntryType,
102 pub timestamp: DateTime<Utc>,
104 pub events: Vec<StreamEvent>,
106 pub checksum: String,
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub enum LogEntryType {
113 Begin,
115 EventAdded,
117 Prepare,
119 Commit,
121 Abort,
123 Checkpoint,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct TransactionCheckpoint {
130 pub checkpoint_id: String,
132 pub timestamp: DateTime<Utc>,
134 pub active_transactions: Vec<String>,
136 pub last_committed_id: Option<String>,
138 pub event_offset: u64,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct TransactionalConfig {
145 pub enable_exactly_once: bool,
147 pub default_isolation_level: IsolationLevel,
149 pub transaction_timeout: Duration,
151 pub enable_wal: bool,
153 pub wal_sync_interval: Duration,
155 pub checkpoint_interval: Duration,
157 pub max_transaction_size: usize,
159 pub idempotency_window: Duration,
161 pub enable_distributed: bool,
163 pub two_phase_commit_timeout: Duration,
165 pub enable_background_tasks: bool,
167}
168
169impl Default for TransactionalConfig {
170 fn default() -> Self {
171 Self {
172 enable_exactly_once: true,
173 default_isolation_level: IsolationLevel::ReadCommitted,
174 transaction_timeout: Duration::from_secs(60),
175 enable_wal: true,
176 wal_sync_interval: Duration::from_millis(100),
177 checkpoint_interval: Duration::from_secs(300),
178 max_transaction_size: 10000,
179 idempotency_window: Duration::from_secs(3600),
180 enable_distributed: false,
181 two_phase_commit_timeout: Duration::from_secs(30),
182 enable_background_tasks: true,
183 }
184 }
185}
186
187#[derive(Debug, Clone, Default)]
189pub struct TransactionalStats {
190 pub transactions_started: u64,
192 pub transactions_committed: u64,
194 pub transactions_aborted: u64,
196 pub events_processed: u64,
198 pub duplicates_detected: u64,
200 pub avg_transaction_duration_ms: f64,
202 pub max_transaction_duration_ms: u64,
204 pub active_transactions: usize,
206 pub wal_entries_written: u64,
208 pub checkpoints_created: u64,
210 pub two_phase_commit_failures: u64,
212}
213
214pub struct TransactionalProcessor {
216 config: TransactionalConfig,
218 active_transactions: Arc<DashMap<String, Arc<RwLock<TransactionMetadata>>>>,
220 transaction_log: Arc<RwLock<Vec<TransactionLogEntry>>>,
222 processed_events: Arc<DashMap<String, DateTime<Utc>>>,
224 checkpoints: Arc<RwLock<Vec<TransactionCheckpoint>>>,
226 stats: Arc<RwLock<TransactionalStats>>,
228 last_checkpoint: Arc<RwLock<Instant>>,
230 command_tx: mpsc::UnboundedSender<TransactionCommand>,
232 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
234 _background_task: Option<tokio::task::JoinHandle<()>>,
236}
237
238enum TransactionCommand {
240 Checkpoint,
241 CleanupExpired,
242 SyncWal,
243 Shutdown,
244}
245
246impl TransactionalProcessor {
247 pub fn new(config: TransactionalConfig) -> Self {
249 let (command_tx, mut command_rx) = mpsc::unbounded_channel();
250 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
251
252 let active_transactions = Arc::new(DashMap::new());
253 let transaction_log = Arc::new(RwLock::new(Vec::new()));
254 let processed_events = Arc::new(DashMap::new());
255 let checkpoints = Arc::new(RwLock::new(Vec::new()));
256 let stats = Arc::new(RwLock::new(TransactionalStats::default()));
257 let last_checkpoint = Arc::new(RwLock::new(Instant::now()));
258
259 let background_task = if config.enable_background_tasks {
261 let active_transactions_clone = active_transactions.clone();
263 let transaction_log_clone = transaction_log.clone();
264 let checkpoints_clone = checkpoints.clone();
265 let stats_clone = stats.clone();
266 let last_checkpoint_clone = last_checkpoint.clone();
267 let processed_events_clone = processed_events.clone();
268 let config_clone = config.clone();
269
270 Some(tokio::spawn(async move {
271 let mut checkpoint_interval =
272 tokio::time::interval(config_clone.checkpoint_interval);
273 let mut cleanup_interval = tokio::time::interval(config_clone.idempotency_window);
274 let mut wal_sync_interval = tokio::time::interval(config_clone.wal_sync_interval);
275
276 loop {
277 tokio::select! {
278 _ = &mut shutdown_rx => {
279 debug!("Transactional processor background task shutting down");
280 break;
281 }
282 _ = checkpoint_interval.tick() => {
283 if let Err(e) = Self::create_checkpoint_internal(
284 &active_transactions_clone,
285 &transaction_log_clone,
286 &checkpoints_clone,
287 &stats_clone,
288 &last_checkpoint_clone,
289 ).await {
290 error!("Failed to create checkpoint: {}", e);
291 }
292 }
293 _ = cleanup_interval.tick() => {
294 Self::cleanup_expired_events(&processed_events_clone, &config_clone).await;
295 }
296 _ = wal_sync_interval.tick() => {
297 debug!("WAL sync triggered");
299 }
300 Some(cmd) = command_rx.recv() => {
301 match cmd {
302 TransactionCommand::Checkpoint => {
303 if let Err(e) = Self::create_checkpoint_internal(
304 &active_transactions_clone,
305 &transaction_log_clone,
306 &checkpoints_clone,
307 &stats_clone,
308 &last_checkpoint_clone,
309 ).await {
310 error!("Manual checkpoint failed: {}", e);
311 }
312 }
313 TransactionCommand::CleanupExpired => {
314 Self::cleanup_expired_events(&processed_events_clone, &config_clone).await;
315 }
316 TransactionCommand::SyncWal => {
317 debug!("Manual WAL sync triggered");
318 }
319 TransactionCommand::Shutdown => {
320 debug!("Shutdown command received");
321 break;
322 }
323 }
324 }
325 }
326 }
327 }))
328 } else {
329 drop(shutdown_rx);
332 None
333 };
334
335 Self {
336 config,
337 active_transactions,
338 transaction_log,
339 processed_events,
340 checkpoints,
341 stats,
342 last_checkpoint,
343 command_tx,
344 shutdown_tx: Some(shutdown_tx),
345 _background_task: background_task,
346 }
347 }
348
349 pub async fn begin_transaction(
351 &self,
352 isolation_level: Option<IsolationLevel>,
353 ) -> Result<String> {
354 let transaction_id = Uuid::new_v4().to_string();
355
356 let metadata = TransactionMetadata {
357 transaction_id: transaction_id.clone(),
358 start_time: Utc::now(),
359 end_time: None,
360 state: TransactionState::Preparing,
361 isolation_level: isolation_level.unwrap_or(self.config.default_isolation_level),
362 participants: Vec::new(),
363 event_count: 0,
364 timeout: self.config.transaction_timeout,
365 properties: HashMap::new(),
366 };
367
368 self.active_transactions
369 .insert(transaction_id.clone(), Arc::new(RwLock::new(metadata)));
370
371 if self.config.enable_wal {
373 self.write_wal_entry(LogEntryType::Begin, &transaction_id, Vec::new())
374 .await?;
375 }
376
377 let mut stats = self.stats.write();
379 stats.transactions_started += 1;
380 stats.active_transactions = self.active_transactions.len();
381
382 info!("Transaction {} started", transaction_id);
383 Ok(transaction_id)
384 }
385
386 pub async fn add_events(&self, transaction_id: &str, events: Vec<StreamEvent>) -> Result<()> {
388 let tx = self
389 .active_transactions
390 .get(transaction_id)
391 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
392
393 {
395 let mut metadata = tx.write();
396
397 if metadata.state != TransactionState::Preparing {
399 return Err(anyhow!(
400 "Cannot add events to transaction in state: {:?}",
401 metadata.state
402 ));
403 }
404
405 if metadata.event_count + events.len() > self.config.max_transaction_size {
407 return Err(anyhow!("Transaction size limit exceeded"));
408 }
409
410 if self.config.enable_exactly_once {
412 for event in &events {
413 let event_id = self.get_event_id(event);
414 if self.processed_events.contains_key(&event_id) {
415 warn!("Duplicate event detected: {}", event_id);
416 self.stats.write().duplicates_detected += 1;
417 continue;
418 }
419 }
420 }
421
422 metadata.event_count += events.len();
423 } if self.config.enable_wal {
427 self.write_wal_entry(LogEntryType::EventAdded, transaction_id, events.clone())
428 .await?;
429 }
430
431 debug!(
432 "Added {} events to transaction {}",
433 events.len(),
434 transaction_id
435 );
436 Ok(())
437 }
438
439 pub async fn prepare_transaction(&self, transaction_id: &str) -> Result<bool> {
441 let tx = self
442 .active_transactions
443 .get(transaction_id)
444 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
445
446 {
448 let mut metadata = tx.write();
449
450 metadata.state = TransactionState::Prepared;
452 } if self.config.enable_wal {
456 self.write_wal_entry(LogEntryType::Prepare, transaction_id, Vec::new())
457 .await?;
458 }
459
460 info!("Transaction {} prepared", transaction_id);
461 Ok(true)
462 }
463
464 pub async fn commit_transaction(&self, transaction_id: &str) -> Result<()> {
466 let tx = self
467 .active_transactions
468 .get(transaction_id)
469 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
470
471 let start_time = {
472 let metadata = tx.read();
473 metadata.start_time
474 };
475
476 {
477 let mut metadata = tx.write();
478
479 if metadata.state != TransactionState::Prepared
481 && metadata.state != TransactionState::Preparing
482 {
483 return Err(anyhow!(
484 "Cannot commit transaction in state: {:?}",
485 metadata.state
486 ));
487 }
488
489 metadata.state = TransactionState::Committing;
490 }
491
492 if self.config.enable_wal {
494 self.write_wal_entry(LogEntryType::Commit, transaction_id, Vec::new())
495 .await?;
496 }
497
498 {
500 let mut metadata = tx.write();
501 metadata.state = TransactionState::Committed;
502 metadata.end_time = Some(Utc::now());
503 }
504
505 let duration = Utc::now()
507 .signed_duration_since(start_time)
508 .num_milliseconds() as u64;
509
510 drop(tx);
512
513 let mut stats = self.stats.write();
514 stats.transactions_committed += 1;
515 stats.max_transaction_duration_ms = stats.max_transaction_duration_ms.max(duration);
516 stats.avg_transaction_duration_ms =
517 (stats.avg_transaction_duration_ms + duration as f64) / 2.0;
518
519 self.active_transactions.remove(transaction_id);
521 stats.active_transactions = self.active_transactions.len();
522
523 #[cfg(not(test))]
525 info!("Transaction {} committed in {}ms", transaction_id, duration);
526 Ok(())
527 }
528
529 pub async fn abort_transaction(&self, transaction_id: &str) -> Result<()> {
531 let tx = self
532 .active_transactions
533 .get(transaction_id)
534 .ok_or_else(|| anyhow!("Transaction not found: {}", transaction_id))?;
535
536 {
537 let mut metadata = tx.write();
538 metadata.state = TransactionState::Aborting;
539 }
540
541 if self.config.enable_wal {
543 self.write_wal_entry(LogEntryType::Abort, transaction_id, Vec::new())
544 .await?;
545 }
546
547 {
549 let mut metadata = tx.write();
550 metadata.state = TransactionState::Aborted;
551 metadata.end_time = Some(Utc::now());
552 }
553
554 drop(tx);
556
557 let mut stats = self.stats.write();
559 stats.transactions_aborted += 1;
560
561 self.active_transactions.remove(transaction_id);
563 stats.active_transactions = self.active_transactions.len();
564
565 #[cfg(not(test))]
566 info!("Transaction {} aborted", transaction_id);
567 Ok(())
568 }
569
570 pub fn is_event_processed(&self, event: &StreamEvent) -> bool {
572 let event_id = self.get_event_id(event);
573 self.processed_events.contains_key(&event_id)
574 }
575
576 pub fn mark_event_processed(&self, event: &StreamEvent) {
578 let event_id = self.get_event_id(event);
579 self.processed_events.insert(event_id, Utc::now());
580 }
581
582 fn get_event_id(&self, event: &StreamEvent) -> String {
584 let metadata = match event {
586 StreamEvent::TripleAdded { metadata, .. }
587 | StreamEvent::TripleRemoved { metadata, .. }
588 | StreamEvent::QuadAdded { metadata, .. }
589 | StreamEvent::QuadRemoved { metadata, .. }
590 | StreamEvent::GraphCreated { metadata, .. }
591 | StreamEvent::GraphCleared { metadata, .. }
592 | StreamEvent::GraphDeleted { metadata, .. }
593 | StreamEvent::GraphMetadataUpdated { metadata, .. }
594 | StreamEvent::GraphPermissionsChanged { metadata, .. }
595 | StreamEvent::GraphStatisticsUpdated { metadata, .. }
596 | StreamEvent::GraphRenamed { metadata, .. }
597 | StreamEvent::GraphMerged { metadata, .. }
598 | StreamEvent::GraphSplit { metadata, .. }
599 | StreamEvent::SparqlUpdate { metadata, .. }
600 | StreamEvent::QueryCompleted { metadata, .. }
601 | StreamEvent::QueryResultAdded { metadata, .. }
602 | StreamEvent::QueryResultRemoved { metadata, .. }
603 | StreamEvent::TransactionBegin { metadata, .. }
604 | StreamEvent::TransactionCommit { metadata, .. }
605 | StreamEvent::TransactionAbort { metadata, .. }
606 | StreamEvent::SchemaChanged { metadata, .. }
607 | StreamEvent::SchemaDefinitionAdded { metadata, .. }
608 | StreamEvent::SchemaDefinitionRemoved { metadata, .. }
609 | StreamEvent::SchemaDefinitionModified { metadata, .. }
610 | StreamEvent::OntologyImported { metadata, .. }
611 | StreamEvent::OntologyRemoved { metadata, .. }
612 | StreamEvent::ConstraintAdded { metadata, .. }
613 | StreamEvent::ConstraintRemoved { metadata, .. }
614 | StreamEvent::ConstraintViolated { metadata, .. }
615 | StreamEvent::IndexCreated { metadata, .. }
616 | StreamEvent::IndexDropped { metadata, .. }
617 | StreamEvent::IndexRebuilt { metadata, .. }
618 | StreamEvent::SchemaUpdated { metadata, .. }
619 | StreamEvent::ShapeAdded { metadata, .. }
620 | StreamEvent::ShapeRemoved { metadata, .. }
621 | StreamEvent::ShapeModified { metadata, .. }
622 | StreamEvent::ShapeUpdated { metadata, .. }
623 | StreamEvent::ShapeValidationStarted { metadata, .. }
624 | StreamEvent::ShapeValidationCompleted { metadata, .. }
625 | StreamEvent::ShapeViolationDetected { metadata, .. }
626 | StreamEvent::Heartbeat { metadata, .. }
627 | StreamEvent::ErrorOccurred { metadata, .. } => metadata,
628 };
629 metadata.event_id.clone()
630 }
631
632 async fn write_wal_entry(
634 &self,
635 entry_type: LogEntryType,
636 transaction_id: &str,
637 events: Vec<StreamEvent>,
638 ) -> Result<()> {
639 let mut log = self.transaction_log.write();
640
641 let entry = TransactionLogEntry {
642 id: log.len() as u64,
643 transaction_id: transaction_id.to_string(),
644 entry_type,
645 timestamp: Utc::now(),
646 events,
647 checksum: self.compute_checksum(transaction_id),
648 };
649
650 log.push(entry);
651
652 let mut stats = self.stats.write();
653 stats.wal_entries_written += 1;
654
655 Ok(())
656 }
657
658 fn compute_checksum(&self, data: &str) -> String {
660 use sha2::{Digest, Sha256};
662 let mut hasher = Sha256::new();
663 hasher.update(data.as_bytes());
664 hex::encode(hasher.finalize())
665 }
666
667 pub async fn create_checkpoint(&self) -> Result<String> {
669 let _ = self.command_tx.send(TransactionCommand::Checkpoint);
670 Ok("Checkpoint scheduled".to_string())
671 }
672
673 async fn create_checkpoint_internal(
675 active_transactions: &Arc<DashMap<String, Arc<RwLock<TransactionMetadata>>>>,
676 transaction_log: &Arc<RwLock<Vec<TransactionLogEntry>>>,
677 checkpoints: &Arc<RwLock<Vec<TransactionCheckpoint>>>,
678 stats: &Arc<RwLock<TransactionalStats>>,
679 last_checkpoint: &Arc<RwLock<Instant>>,
680 ) -> Result<()> {
681 let checkpoint_id = Uuid::new_v4().to_string();
682
683 let active_tx_ids: Vec<String> = active_transactions
684 .iter()
685 .map(|entry| entry.key().clone())
686 .collect();
687
688 let event_offset = transaction_log.read().len() as u64;
689
690 let checkpoint = TransactionCheckpoint {
691 checkpoint_id: checkpoint_id.clone(),
692 timestamp: Utc::now(),
693 active_transactions: active_tx_ids,
694 last_committed_id: None,
695 event_offset,
696 };
697
698 checkpoints.write().push(checkpoint);
699 *last_checkpoint.write() = Instant::now();
700
701 let mut stats_guard = stats.write();
702 stats_guard.checkpoints_created += 1;
703
704 info!(
705 "Checkpoint {} created at offset {}",
706 checkpoint_id, event_offset
707 );
708 Ok(())
709 }
710
711 async fn cleanup_expired_events(
713 processed_events: &Arc<DashMap<String, DateTime<Utc>>>,
714 config: &TransactionalConfig,
715 ) {
716 let cutoff = Utc::now()
717 - chrono::Duration::from_std(config.idempotency_window)
718 .expect("idempotency_window should be valid chrono Duration");
719
720 processed_events.retain(|_, timestamp| *timestamp > cutoff);
721
722 debug!(
723 "Cleaned up expired events, {} remaining",
724 processed_events.len()
725 );
726 }
727
728 pub fn get_transaction_status(&self, transaction_id: &str) -> Option<TransactionState> {
730 self.active_transactions
731 .get(transaction_id)
732 .map(|tx| tx.read().state.clone())
733 }
734
735 pub fn get_stats(&self) -> TransactionalStats {
737 let mut stats = self.stats.read().clone();
738 stats.active_transactions = self.active_transactions.len();
739 stats
740 }
741
742 pub async fn recover_from_checkpoint(&self, checkpoint_id: &str) -> Result<()> {
744 let checkpoints = self.checkpoints.read();
745
746 let checkpoint = checkpoints
747 .iter()
748 .find(|cp| cp.checkpoint_id == checkpoint_id)
749 .ok_or_else(|| anyhow!("Checkpoint not found: {}", checkpoint_id))?;
750
751 for tx_id in &checkpoint.active_transactions {
753 info!("Recovering transaction: {}", tx_id);
754 }
756
757 info!(
758 "Recovered from checkpoint {} at offset {}",
759 checkpoint_id, checkpoint.event_offset
760 );
761 Ok(())
762 }
763
764 pub async fn shutdown(&mut self) -> Result<()> {
766 if let Some(shutdown_tx) = self.shutdown_tx.take() {
768 let _ = shutdown_tx.send(());
769 }
770
771 if let Some(task) = self._background_task.take() {
773 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
774 }
775
776 info!("Transactional processor shut down");
777 Ok(())
778 }
779}
780
781impl Drop for TransactionalProcessor {
782 fn drop(&mut self) {
783 if let Some(shutdown_tx) = self.shutdown_tx.take() {
785 let _ = shutdown_tx.send(());
786 }
787 }
788}
789
790#[cfg(test)]
791mod tests {
792 use super::*;
793 use crate::event::EventMetadata;
794
795 #[tokio::test]
796 async fn test_transaction_lifecycle() {
797 let config = TransactionalConfig {
799 enable_background_tasks: false,
800 enable_wal: false,
801 ..Default::default()
802 };
803 let processor = TransactionalProcessor::new(config);
804
805 let tx_id = processor
807 .begin_transaction(Some(IsolationLevel::ReadCommitted))
808 .await
809 .unwrap();
810
811 assert!(processor.active_transactions.contains_key(&tx_id));
812
813 let event = StreamEvent::SchemaChanged {
815 schema_type: crate::event::SchemaType::Ontology,
816 change_type: crate::event::SchemaChangeType::Added,
817 details: "test schema change".to_string(),
818 metadata: EventMetadata {
819 event_id: Uuid::new_v4().to_string(),
820 timestamp: Utc::now(),
821 source: "test".to_string(),
822 user: None,
823 context: None,
824 caused_by: None,
825 version: "1.0".to_string(),
826 properties: HashMap::new(),
827 checksum: None,
828 },
829 };
830
831 processor
832 .add_events(&tx_id, vec![event.clone()])
833 .await
834 .unwrap();
835
836 assert!(processor.prepare_transaction(&tx_id).await.unwrap());
838
839 processor.commit_transaction(&tx_id).await.unwrap();
841
842 assert!(!processor.active_transactions.contains_key(&tx_id));
844
845 let stats = processor.get_stats();
846 assert_eq!(stats.transactions_started, 1);
847 assert_eq!(stats.transactions_committed, 1);
848 }
849
850 #[tokio::test]
851 async fn test_transaction_abort() {
852 let config = TransactionalConfig {
854 enable_background_tasks: false,
855 enable_wal: false,
856 ..Default::default()
857 };
858 let processor = TransactionalProcessor::new(config);
859
860 let tx_id = processor.begin_transaction(None).await.unwrap();
861
862 processor.abort_transaction(&tx_id).await.unwrap();
863
864 assert!(!processor.active_transactions.contains_key(&tx_id));
865
866 let stats = processor.get_stats();
867 assert_eq!(stats.transactions_started, 1);
868 assert_eq!(stats.transactions_aborted, 1);
869 }
870
871 #[tokio::test]
872 async fn test_minimal() {
873 let config = TransactionalConfig {
874 enable_background_tasks: false,
875 enable_wal: false,
876 ..Default::default()
877 };
878 let _processor = TransactionalProcessor::new(config);
879 }
881
882 #[tokio::test]
883 async fn test_begin_only() {
884 let config = TransactionalConfig {
885 enable_background_tasks: false,
886 enable_wal: false,
887 ..Default::default()
888 };
889 let processor = TransactionalProcessor::new(config);
890
891 let _tx_id = processor.begin_transaction(None).await.unwrap();
893 }
895
896 #[tokio::test]
897 async fn test_begin_prepare_only() {
898 let config = TransactionalConfig {
899 enable_background_tasks: false,
900 enable_wal: false,
901 ..Default::default()
902 };
903 let processor = TransactionalProcessor::new(config);
904
905 let tx_id = processor.begin_transaction(None).await.unwrap();
906 processor.prepare_transaction(&tx_id).await.unwrap();
907 }
909
910 #[tokio::test]
911 async fn test_begin_prepare_commit() {
912 let config = TransactionalConfig {
913 enable_background_tasks: false,
914 enable_wal: false,
915 ..Default::default()
916 };
917 let processor = TransactionalProcessor::new(config);
918
919 let tx_id = processor.begin_transaction(None).await.unwrap();
920 processor.prepare_transaction(&tx_id).await.unwrap();
921 processor.commit_transaction(&tx_id).await.unwrap();
922 }
924
925 #[tokio::test]
926 async fn test_idempotency() {
927 let config = TransactionalConfig {
929 enable_background_tasks: false,
930 ..Default::default()
931 };
932 let processor = TransactionalProcessor::new(config);
933
934 let event = StreamEvent::SchemaChanged {
935 schema_type: crate::event::SchemaType::Ontology,
936 change_type: crate::event::SchemaChangeType::Added,
937 details: "test schema change".to_string(),
938 metadata: EventMetadata {
939 event_id: "test-event-123".to_string(),
940 timestamp: Utc::now(),
941 source: "test".to_string(),
942 user: None,
943 context: None,
944 caused_by: None,
945 version: "1.0".to_string(),
946 properties: HashMap::new(),
947 checksum: None,
948 },
949 };
950
951 assert!(!processor.is_event_processed(&event));
952
953 processor.mark_event_processed(&event);
954
955 assert!(processor.is_event_processed(&event));
956 }
957}