1use crate::common::CdcEvent;
56use serde::{Deserialize, Serialize};
57use std::collections::HashMap;
58use std::sync::atomic::{AtomicU64, Ordering};
59use std::sync::Arc;
60use tokio::sync::RwLock;
61use tracing::{debug, warn};
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "UPPERCASE")]
66pub enum TransactionStatus {
67 Begin,
69 End,
71}
72
73impl std::fmt::Display for TransactionStatus {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 match self {
76 TransactionStatus::Begin => write!(f, "BEGIN"),
77 TransactionStatus::End => write!(f, "END"),
78 }
79 }
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
84pub struct DataCollectionEventCount {
85 pub data_collection: String,
87 pub event_count: u64,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct TransactionEvent {
96 pub status: TransactionStatus,
98 pub id: String,
100 pub ts_ms: i64,
102 #[serde(skip_serializing_if = "Option::is_none")]
104 pub event_count: Option<u64>,
105 #[serde(skip_serializing_if = "Option::is_none")]
107 pub data_collections: Option<Vec<DataCollectionEventCount>>,
108}
109
110impl TransactionEvent {
111 pub fn begin(txn_id: &str, lsn: &str, ts_ms: i64) -> Self {
113 Self {
114 status: TransactionStatus::Begin,
115 id: format!("{}:{}", txn_id, lsn),
116 ts_ms,
117 event_count: None,
118 data_collections: None,
119 }
120 }
121
122 pub fn end(
124 txn_id: &str,
125 lsn: &str,
126 ts_ms: i64,
127 event_count: u64,
128 data_collections: Vec<DataCollectionEventCount>,
129 ) -> Self {
130 Self {
131 status: TransactionStatus::End,
132 id: format!("{}:{}", txn_id, lsn),
133 ts_ms,
134 event_count: Some(event_count),
135 data_collections: Some(data_collections),
136 }
137 }
138
139 pub fn topic_name(topic_prefix: &str) -> String {
141 format!("{}.transaction", topic_prefix)
142 }
143
144 pub fn to_json_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
146 serde_json::to_vec(self)
147 }
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
152pub struct TransactionContext {
153 pub id: String,
155 pub total_order: u64,
157 pub data_collection_order: u64,
159}
160
161impl TransactionContext {
162 pub fn new(id: impl Into<String>, total_order: u64, data_collection_order: u64) -> Self {
164 Self {
165 id: id.into(),
166 total_order,
167 data_collection_order,
168 }
169 }
170}
171
172#[derive(Debug)]
174struct InFlightTransaction {
175 txn_id: String,
177 lsn: String,
179 start_ts_ms: i64,
181 latest_ts_ms: i64,
183 total_event_count: u64,
185 data_collection_counts: HashMap<String, u64>,
187}
188
189impl InFlightTransaction {
190 fn new(txn_id: String, lsn: String, ts_ms: i64) -> Self {
191 Self {
192 txn_id,
193 lsn,
194 start_ts_ms: ts_ms,
195 latest_ts_ms: ts_ms,
196 total_event_count: 0,
197 data_collection_counts: HashMap::new(),
198 }
199 }
200
201 fn track_event(&mut self, data_collection: &str, ts_ms: i64) -> (u64, u64) {
202 self.total_event_count += 1;
203 self.latest_ts_ms = ts_ms;
204
205 let dc_count = self
206 .data_collection_counts
207 .entry(data_collection.to_string())
208 .or_insert(0);
209 *dc_count += 1;
210
211 (self.total_event_count, *dc_count)
212 }
213
214 fn duration_ms(&self) -> i64 {
216 self.latest_ts_ms.saturating_sub(self.start_ts_ms)
217 }
218}
219
220#[derive(Debug, Clone)]
222pub struct TransactionTopicConfig {
223 pub topic_prefix: String,
225 pub emit_begin: bool,
227 pub emit_end: bool,
229 pub enrich_events: bool,
231 pub max_in_flight: usize,
233}
234
235impl Default for TransactionTopicConfig {
236 fn default() -> Self {
237 Self {
238 topic_prefix: "rivven".to_string(),
239 emit_begin: true,
240 emit_end: true,
241 enrich_events: true,
242 max_in_flight: 1000,
243 }
244 }
245}
246
247impl TransactionTopicConfig {
248 pub fn builder() -> TransactionTopicConfigBuilder {
250 TransactionTopicConfigBuilder::default()
251 }
252
253 pub fn transaction_topic(&self) -> String {
255 TransactionEvent::topic_name(&self.topic_prefix)
256 }
257}
258
259#[derive(Default)]
261pub struct TransactionTopicConfigBuilder {
262 config: TransactionTopicConfig,
263}
264
265impl TransactionTopicConfigBuilder {
266 pub fn topic_prefix(mut self, prefix: impl Into<String>) -> Self {
268 self.config.topic_prefix = prefix.into();
269 self
270 }
271
272 pub fn emit_begin(mut self, emit: bool) -> Self {
274 self.config.emit_begin = emit;
275 self
276 }
277
278 pub fn emit_end(mut self, emit: bool) -> Self {
280 self.config.emit_end = emit;
281 self
282 }
283
284 pub fn enrich_events(mut self, enrich: bool) -> Self {
286 self.config.enrich_events = enrich;
287 self
288 }
289
290 pub fn max_in_flight(mut self, max: usize) -> Self {
292 self.config.max_in_flight = max;
293 self
294 }
295
296 pub fn build(self) -> TransactionTopicConfig {
298 self.config
299 }
300}
301
302#[derive(Debug, Default)]
304pub struct TransactionTopicStats {
305 pub begin_events: AtomicU64,
307 pub end_events: AtomicU64,
309 pub events_enriched: AtomicU64,
311 pub in_flight_transactions: AtomicU64,
313 pub transactions_completed: AtomicU64,
315 pub orphan_ends: AtomicU64,
317}
318
319impl TransactionTopicStats {
320 pub fn snapshot(&self) -> TransactionTopicStatsSnapshot {
322 TransactionTopicStatsSnapshot {
323 begin_events: self.begin_events.load(Ordering::Relaxed),
324 end_events: self.end_events.load(Ordering::Relaxed),
325 events_enriched: self.events_enriched.load(Ordering::Relaxed),
326 in_flight_transactions: self.in_flight_transactions.load(Ordering::Relaxed),
327 transactions_completed: self.transactions_completed.load(Ordering::Relaxed),
328 orphan_ends: self.orphan_ends.load(Ordering::Relaxed),
329 }
330 }
331}
332
333#[derive(Debug, Clone, Default)]
335pub struct TransactionTopicStatsSnapshot {
336 pub begin_events: u64,
337 pub end_events: u64,
338 pub events_enriched: u64,
339 pub in_flight_transactions: u64,
340 pub transactions_completed: u64,
341 pub orphan_ends: u64,
342}
343
344#[derive(Debug)]
348pub struct TransactionTopicEmitter {
349 config: TransactionTopicConfig,
350 in_flight: Arc<RwLock<HashMap<String, InFlightTransaction>>>,
352 stats: Arc<TransactionTopicStats>,
354}
355
356impl TransactionTopicEmitter {
357 pub fn new(config: TransactionTopicConfig) -> Self {
359 Self {
360 config,
361 in_flight: Arc::new(RwLock::new(HashMap::new())),
362 stats: Arc::new(TransactionTopicStats::default()),
363 }
364 }
365
366 pub fn topic_name(&self) -> String {
368 self.config.transaction_topic()
369 }
370
371 pub fn stats(&self) -> TransactionTopicStatsSnapshot {
373 self.stats.snapshot()
374 }
375
376 pub async fn begin_transaction(
380 &self,
381 txn_id: &str,
382 lsn: &str,
383 ts_ms: i64,
384 ) -> Option<TransactionEvent> {
385 let mut in_flight = self.in_flight.write().await;
386
387 if in_flight.len() >= self.config.max_in_flight {
389 warn!(
390 txn_id = txn_id,
391 in_flight = in_flight.len(),
392 max = self.config.max_in_flight,
393 "High number of in-flight transactions"
394 );
395 }
396
397 let txn = InFlightTransaction::new(txn_id.to_string(), lsn.to_string(), ts_ms);
399 in_flight.insert(txn_id.to_string(), txn);
400
401 self.stats
402 .in_flight_transactions
403 .store(in_flight.len() as u64, Ordering::Relaxed);
404
405 if self.config.emit_begin {
406 self.stats.begin_events.fetch_add(1, Ordering::Relaxed);
407 debug!(txn_id = txn_id, lsn = lsn, "Transaction BEGIN");
408 Some(TransactionEvent::begin(txn_id, lsn, ts_ms))
409 } else {
410 None
411 }
412 }
413
414 pub async fn track_event(
419 &self,
420 txn_id: &str,
421 data_collection: &str,
422 ts_ms: i64,
423 ) -> Option<TransactionContext> {
424 let mut in_flight = self.in_flight.write().await;
425
426 if let Some(txn) = in_flight.get_mut(txn_id) {
427 let (total_order, dc_order) = txn.track_event(data_collection, ts_ms);
428
429 if self.config.enrich_events {
430 self.stats.events_enriched.fetch_add(1, Ordering::Relaxed);
431 Some(TransactionContext::new(
432 format!("{}:{}", txn_id, txn.lsn),
433 total_order,
434 dc_order,
435 ))
436 } else {
437 None
438 }
439 } else {
440 debug!(txn_id = txn_id, "Event for unknown transaction");
441 None
442 }
443 }
444
445 pub async fn end_transaction(&self, txn_id: &str, ts_ms: i64) -> Option<TransactionEvent> {
449 let mut in_flight = self.in_flight.write().await;
450
451 if let Some(txn) = in_flight.remove(txn_id) {
452 self.stats
453 .in_flight_transactions
454 .store(in_flight.len() as u64, Ordering::Relaxed);
455 self.stats
456 .transactions_completed
457 .fetch_add(1, Ordering::Relaxed);
458
459 let duration_ms = txn.duration_ms();
460
461 if self.config.emit_end {
462 self.stats.end_events.fetch_add(1, Ordering::Relaxed);
463
464 let data_collections: Vec<DataCollectionEventCount> = txn
466 .data_collection_counts
467 .into_iter()
468 .map(|(dc, count)| DataCollectionEventCount {
469 data_collection: dc,
470 event_count: count,
471 })
472 .collect();
473
474 debug!(
475 txn_id = txn_id,
476 event_count = txn.total_event_count,
477 tables = data_collections.len(),
478 duration_ms = duration_ms,
479 "Transaction END"
480 );
481
482 Some(TransactionEvent::end(
483 &txn.txn_id,
484 &txn.lsn,
485 ts_ms,
486 txn.total_event_count,
487 data_collections,
488 ))
489 } else {
490 None
491 }
492 } else {
493 self.stats.orphan_ends.fetch_add(1, Ordering::Relaxed);
495 warn!(txn_id = txn_id, "END for unknown transaction (orphan)");
496 None
497 }
498 }
499
500 pub async fn in_flight_count(&self) -> usize {
502 self.in_flight.read().await.len()
503 }
504
505 pub async fn clear_in_flight(&self) {
507 let mut in_flight = self.in_flight.write().await;
508 let count = in_flight.len();
509 in_flight.clear();
510 self.stats
511 .in_flight_transactions
512 .store(0, Ordering::Relaxed);
513
514 if count > 0 {
515 warn!(count = count, "Cleared in-flight transactions");
516 }
517 }
518
519 pub async fn enrich_event(&self, event: &mut CdcEvent) -> bool {
523 if !self.config.enrich_events {
524 return false;
525 }
526
527 let txn_id = match &event.transaction {
529 Some(meta) => meta.id.clone(),
530 None => return false,
531 };
532
533 let data_collection = format!("{}.{}", event.schema, event.table);
534
535 if let Some(ctx) = self
536 .track_event(&txn_id, &data_collection, event.timestamp)
537 .await
538 {
539 if let Some(ref mut meta) = event.transaction {
541 meta.id = ctx.id;
542 meta.sequence = ctx.total_order;
543 }
544 true
545 } else {
546 false
547 }
548 }
549}
550
551impl Clone for TransactionTopicEmitter {
552 fn clone(&self) -> Self {
553 Self {
554 config: self.config.clone(),
555 in_flight: Arc::clone(&self.in_flight),
556 stats: Arc::clone(&self.stats),
557 }
558 }
559}
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564
565 #[test]
566 fn test_transaction_event_begin() {
567 let event = TransactionEvent::begin("555", "0/ABC123", 1705123456789);
568
569 assert_eq!(event.status, TransactionStatus::Begin);
570 assert_eq!(event.id, "555:0/ABC123");
571 assert_eq!(event.ts_ms, 1705123456789);
572 assert!(event.event_count.is_none());
573 assert!(event.data_collections.is_none());
574 }
575
576 #[test]
577 fn test_transaction_event_end() {
578 let data_collections = vec![
579 DataCollectionEventCount {
580 data_collection: "public.orders".to_string(),
581 event_count: 3,
582 },
583 DataCollectionEventCount {
584 data_collection: "public.customers".to_string(),
585 event_count: 2,
586 },
587 ];
588
589 let event = TransactionEvent::end("555", "0/ABC123", 1705123456999, 5, data_collections);
590
591 assert_eq!(event.status, TransactionStatus::End);
592 assert_eq!(event.id, "555:0/ABC123");
593 assert_eq!(event.event_count, Some(5));
594
595 let dcs = event.data_collections.unwrap();
596 assert_eq!(dcs.len(), 2);
597 assert_eq!(dcs[0].data_collection, "public.orders");
598 assert_eq!(dcs[0].event_count, 3);
599 }
600
601 #[test]
602 fn test_transaction_event_serialization() {
603 let event = TransactionEvent::begin("555", "0/ABC", 1705123456789);
604 let json = serde_json::to_string(&event).unwrap();
605
606 assert!(json.contains("\"status\":\"BEGIN\""));
607 assert!(json.contains("\"id\":\"555:0/ABC\""));
608 assert!(json.contains("\"ts_ms\":1705123456789"));
609 assert!(!json.contains("event_count"));
611 assert!(!json.contains("data_collections"));
612 }
613
614 #[test]
615 fn test_transaction_event_end_serialization() {
616 let data_collections = vec![DataCollectionEventCount {
617 data_collection: "public.test".to_string(),
618 event_count: 10,
619 }];
620 let event = TransactionEvent::end("123", "0/FF", 1705123456999, 10, data_collections);
621 let json = serde_json::to_string(&event).unwrap();
622
623 assert!(json.contains("\"status\":\"END\""));
624 assert!(json.contains("\"event_count\":10"));
625 assert!(json.contains("\"data_collections\""));
626 assert!(json.contains("public.test"));
627 }
628
629 #[test]
630 fn test_topic_name() {
631 assert_eq!(
632 TransactionEvent::topic_name("fulfillment"),
633 "fulfillment.transaction"
634 );
635 assert_eq!(TransactionEvent::topic_name("mydb"), "mydb.transaction");
636 }
637
638 #[test]
639 fn test_config_builder() {
640 let config = TransactionTopicConfig::builder()
641 .topic_prefix("testdb")
642 .emit_begin(true)
643 .emit_end(true)
644 .enrich_events(false)
645 .max_in_flight(500)
646 .build();
647
648 assert_eq!(config.topic_prefix, "testdb");
649 assert!(config.emit_begin);
650 assert!(config.emit_end);
651 assert!(!config.enrich_events);
652 assert_eq!(config.max_in_flight, 500);
653 assert_eq!(config.transaction_topic(), "testdb.transaction");
654 }
655
656 #[tokio::test]
657 async fn test_emitter_full_transaction() {
658 let config = TransactionTopicConfig::builder()
659 .topic_prefix("test")
660 .emit_begin(true)
661 .emit_end(true)
662 .enrich_events(true)
663 .build();
664
665 let emitter = TransactionTopicEmitter::new(config);
666
667 let begin = emitter
669 .begin_transaction("txn-1", "0/100", 1000)
670 .await
671 .unwrap();
672 assert_eq!(begin.status, TransactionStatus::Begin);
673 assert_eq!(begin.id, "txn-1:0/100");
674
675 assert_eq!(emitter.in_flight_count().await, 1);
676
677 let ctx1 = emitter
679 .track_event("txn-1", "public.orders", 1001)
680 .await
681 .unwrap();
682 assert_eq!(ctx1.total_order, 1);
683 assert_eq!(ctx1.data_collection_order, 1);
684
685 let ctx2 = emitter
686 .track_event("txn-1", "public.orders", 1002)
687 .await
688 .unwrap();
689 assert_eq!(ctx2.total_order, 2);
690 assert_eq!(ctx2.data_collection_order, 2);
691
692 let ctx3 = emitter
693 .track_event("txn-1", "public.customers", 1003)
694 .await
695 .unwrap();
696 assert_eq!(ctx3.total_order, 3);
697 assert_eq!(ctx3.data_collection_order, 1); let end = emitter.end_transaction("txn-1", 1010).await.unwrap();
701 assert_eq!(end.status, TransactionStatus::End);
702 assert_eq!(end.event_count, Some(3));
703
704 let dcs = end.data_collections.unwrap();
705 assert_eq!(dcs.len(), 2);
706
707 let orders_count = dcs
709 .iter()
710 .find(|dc| dc.data_collection == "public.orders")
711 .unwrap()
712 .event_count;
713 assert_eq!(orders_count, 2);
714
715 let customers_count = dcs
717 .iter()
718 .find(|dc| dc.data_collection == "public.customers")
719 .unwrap()
720 .event_count;
721 assert_eq!(customers_count, 1);
722
723 assert_eq!(emitter.in_flight_count().await, 0);
725
726 let stats = emitter.stats();
728 assert_eq!(stats.begin_events, 1);
729 assert_eq!(stats.end_events, 1);
730 assert_eq!(stats.events_enriched, 3);
731 assert_eq!(stats.transactions_completed, 1);
732 }
733
734 #[tokio::test]
735 async fn test_emitter_orphan_end() {
736 let config = TransactionTopicConfig::builder()
737 .topic_prefix("test")
738 .emit_end(true)
739 .build();
740
741 let emitter = TransactionTopicEmitter::new(config);
742
743 let end = emitter.end_transaction("unknown", 1000).await;
745 assert!(end.is_none());
746
747 let stats = emitter.stats();
748 assert_eq!(stats.orphan_ends, 1);
749 }
750
751 #[tokio::test]
752 async fn test_emitter_no_enrichment() {
753 let config = TransactionTopicConfig::builder()
754 .topic_prefix("test")
755 .enrich_events(false)
756 .build();
757
758 let emitter = TransactionTopicEmitter::new(config);
759
760 emitter.begin_transaction("txn-1", "0/100", 1000).await;
761
762 let ctx = emitter.track_event("txn-1", "public.test", 1001).await;
764 assert!(ctx.is_none());
765
766 let stats = emitter.stats();
767 assert_eq!(stats.events_enriched, 0);
768 }
769
770 #[tokio::test]
771 async fn test_emitter_clear_in_flight() {
772 let config = TransactionTopicConfig::default();
773 let emitter = TransactionTopicEmitter::new(config);
774
775 emitter.begin_transaction("txn-1", "0/100", 1000).await;
776 emitter.begin_transaction("txn-2", "0/200", 1001).await;
777
778 assert_eq!(emitter.in_flight_count().await, 2);
779
780 emitter.clear_in_flight().await;
781
782 assert_eq!(emitter.in_flight_count().await, 0);
783 }
784
785 #[tokio::test]
786 async fn test_transaction_context() {
787 let ctx = TransactionContext::new("txn-1:0/ABC", 5, 3);
788
789 assert_eq!(ctx.id, "txn-1:0/ABC");
790 assert_eq!(ctx.total_order, 5);
791 assert_eq!(ctx.data_collection_order, 3);
792
793 let json = serde_json::to_string(&ctx).unwrap();
795 assert!(json.contains("\"id\":\"txn-1:0/ABC\""));
796 assert!(json.contains("\"total_order\":5"));
797 assert!(json.contains("\"data_collection_order\":3"));
798 }
799
800 #[test]
801 fn test_data_collection_event_count_serialization() {
802 let dc = DataCollectionEventCount {
803 data_collection: "inventory.products".to_string(),
804 event_count: 42,
805 };
806
807 let json = serde_json::to_string(&dc).unwrap();
808 assert!(json.contains("\"data_collection\":\"inventory.products\""));
809 assert!(json.contains("\"event_count\":42"));
810 }
811}