Skip to main content

rivven_cdc/common/
transaction_topic.rs

1//! # Transaction Metadata Topic
2//!
3//! Transaction boundary events for exactly-once processing.
4//!
5//! When `provide.transaction.metadata` is enabled, emits:
6//! - **BEGIN** events when transaction starts
7//! - **END** events when transaction commits with event counts
8//!
9//! Also enriches individual CDC events with transaction context:
10//! - `total_order` - Absolute position among all transaction events
11//! - `data_collection_order` - Position within the table
12//!
13//! ## Topic Naming
14//!
15//! Transaction events are published to `<topic_prefix>.transaction`
16//!
17//! ## Event Format
18//!
19//! ```json
20//! {
21//!   "status": "BEGIN|END",
22//!   "id": "txn-12345:0/ABC123",
23//!   "ts_ms": 1705123456789,
24//!   "event_count": 5,
25//!   "data_collections": [
26//!     {"data_collection": "public.orders", "event_count": 3},
27//!     {"data_collection": "public.customers", "event_count": 2}
28//!   ]
29//! }
30//! ```
31//!
32//! ## Usage
33//!
34//! ```rust,ignore
35//! use rivven_cdc::common::transaction_topic::{TransactionTopicEmitter, TransactionTopicConfig};
36//!
37//! let config = TransactionTopicConfig::builder()
38//!     .topic_prefix("mydb")
39//!     .build();
40//!
41//! let emitter = TransactionTopicEmitter::new(config);
42//!
43//! // Register transaction begin
44//! let begin_event = emitter.begin_transaction("txn-123", "0/ABC", 1705123456789);
45//!
46//! // Track events
47//! emitter.track_event("txn-123", "public.orders");
48//! emitter.track_event("txn-123", "public.orders");
49//! emitter.track_event("txn-123", "public.customers");
50//!
51//! // Emit end event
52//! let end_event = emitter.end_transaction("txn-123", 1705123456999)?;
53//! ```
54
55use crate::common::CdcEvent;
56use serde::{Deserialize, Serialize};
57use std::collections::HashMap;
58use std::sync::atomic::{AtomicU64, Ordering};
59use std::sync::Arc;
60use tokio::sync::RwLock;
61use tracing::{debug, warn};
62
63/// Transaction status in boundary events.
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "UPPERCASE")]
66pub enum TransactionStatus {
67    /// Transaction has started
68    Begin,
69    /// Transaction has committed
70    End,
71}
72
73impl std::fmt::Display for TransactionStatus {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        match self {
76            TransactionStatus::Begin => write!(f, "BEGIN"),
77            TransactionStatus::End => write!(f, "END"),
78        }
79    }
80}
81
82/// Event count per data collection (table) in a transaction.
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
84pub struct DataCollectionEventCount {
85    /// Fully qualified table name: schema.table
86    pub data_collection: String,
87    /// Number of events from this table
88    pub event_count: u64,
89}
90
91/// Transaction boundary event (BEGIN or END).
92///
93/// Published to `<topic_prefix>.transaction` topic.
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct TransactionEvent {
96    /// Status: BEGIN or END
97    pub status: TransactionStatus,
98    /// Transaction ID (format: "txId:LSN")
99    pub id: String,
100    /// Event timestamp in milliseconds since epoch
101    pub ts_ms: i64,
102    /// Total event count (only for END events)
103    #[serde(skip_serializing_if = "Option::is_none")]
104    pub event_count: Option<u64>,
105    /// Per-table event counts (only for END events)
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub data_collections: Option<Vec<DataCollectionEventCount>>,
108}
109
110impl TransactionEvent {
111    /// Create a BEGIN transaction event.
112    pub fn begin(txn_id: &str, lsn: &str, ts_ms: i64) -> Self {
113        Self {
114            status: TransactionStatus::Begin,
115            id: format!("{}:{}", txn_id, lsn),
116            ts_ms,
117            event_count: None,
118            data_collections: None,
119        }
120    }
121
122    /// Create an END transaction event.
123    pub fn end(
124        txn_id: &str,
125        lsn: &str,
126        ts_ms: i64,
127        event_count: u64,
128        data_collections: Vec<DataCollectionEventCount>,
129    ) -> Self {
130        Self {
131            status: TransactionStatus::End,
132            id: format!("{}:{}", txn_id, lsn),
133            ts_ms,
134            event_count: Some(event_count),
135            data_collections: Some(data_collections),
136        }
137    }
138
139    /// Get the topic name for transaction events.
140    pub fn topic_name(topic_prefix: &str) -> String {
141        format!("{}.transaction", topic_prefix)
142    }
143
144    /// Serialize to JSON bytes.
145    pub fn to_json_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
146        serde_json::to_vec(self)
147    }
148}
149
150/// Transaction context added to individual CDC events.
151#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
152pub struct TransactionContext {
153    /// Transaction ID (format: "txId:LSN")
154    pub id: String,
155    /// Absolute position among all events in the transaction (1-based)
156    pub total_order: u64,
157    /// Position among events for this data collection (1-based)
158    pub data_collection_order: u64,
159}
160
161impl TransactionContext {
162    /// Create a new transaction context.
163    pub fn new(id: impl Into<String>, total_order: u64, data_collection_order: u64) -> Self {
164        Self {
165            id: id.into(),
166            total_order,
167            data_collection_order,
168        }
169    }
170}
171
172/// In-flight transaction state.
173#[derive(Debug)]
174struct InFlightTransaction {
175    /// Transaction ID
176    txn_id: String,
177    /// LSN/position
178    lsn: String,
179    /// Start timestamp (for duration tracking)
180    start_ts_ms: i64,
181    /// Latest timestamp
182    latest_ts_ms: i64,
183    /// Global event counter
184    total_event_count: u64,
185    /// Per-table event counts
186    data_collection_counts: HashMap<String, u64>,
187}
188
189impl InFlightTransaction {
190    fn new(txn_id: String, lsn: String, ts_ms: i64) -> Self {
191        Self {
192            txn_id,
193            lsn,
194            start_ts_ms: ts_ms,
195            latest_ts_ms: ts_ms,
196            total_event_count: 0,
197            data_collection_counts: HashMap::new(),
198        }
199    }
200
201    fn track_event(&mut self, data_collection: &str, ts_ms: i64) -> (u64, u64) {
202        self.total_event_count += 1;
203        self.latest_ts_ms = ts_ms;
204
205        let dc_count = self
206            .data_collection_counts
207            .entry(data_collection.to_string())
208            .or_insert(0);
209        *dc_count += 1;
210
211        (self.total_event_count, *dc_count)
212    }
213
214    /// Get transaction duration in milliseconds
215    fn duration_ms(&self) -> i64 {
216        self.latest_ts_ms.saturating_sub(self.start_ts_ms)
217    }
218}
219
220/// Configuration for transaction topic emission.
221#[derive(Debug, Clone)]
222pub struct TransactionTopicConfig {
223    /// Topic prefix (e.g., "mydb" -> "mydb.transaction")
224    pub topic_prefix: String,
225    /// Whether to emit BEGIN events (default: true)
226    pub emit_begin: bool,
227    /// Whether to emit END events (default: true)
228    pub emit_end: bool,
229    /// Whether to enrich CDC events with transaction context (default: true)
230    pub enrich_events: bool,
231    /// Maximum in-flight transactions before warning
232    pub max_in_flight: usize,
233}
234
235impl Default for TransactionTopicConfig {
236    fn default() -> Self {
237        Self {
238            topic_prefix: "rivven".to_string(),
239            emit_begin: true,
240            emit_end: true,
241            enrich_events: true,
242            max_in_flight: 1000,
243        }
244    }
245}
246
247impl TransactionTopicConfig {
248    /// Create a builder for TransactionTopicConfig.
249    pub fn builder() -> TransactionTopicConfigBuilder {
250        TransactionTopicConfigBuilder::default()
251    }
252
253    /// Get the transaction topic name.
254    pub fn transaction_topic(&self) -> String {
255        TransactionEvent::topic_name(&self.topic_prefix)
256    }
257}
258
259/// Builder for TransactionTopicConfig.
260#[derive(Default)]
261pub struct TransactionTopicConfigBuilder {
262    config: TransactionTopicConfig,
263}
264
265impl TransactionTopicConfigBuilder {
266    /// Set the topic prefix.
267    pub fn topic_prefix(mut self, prefix: impl Into<String>) -> Self {
268        self.config.topic_prefix = prefix.into();
269        self
270    }
271
272    /// Enable/disable BEGIN event emission.
273    pub fn emit_begin(mut self, emit: bool) -> Self {
274        self.config.emit_begin = emit;
275        self
276    }
277
278    /// Enable/disable END event emission.
279    pub fn emit_end(mut self, emit: bool) -> Self {
280        self.config.emit_end = emit;
281        self
282    }
283
284    /// Enable/disable event enrichment with transaction context.
285    pub fn enrich_events(mut self, enrich: bool) -> Self {
286        self.config.enrich_events = enrich;
287        self
288    }
289
290    /// Set max in-flight transactions.
291    pub fn max_in_flight(mut self, max: usize) -> Self {
292        self.config.max_in_flight = max;
293        self
294    }
295
296    /// Build the configuration.
297    pub fn build(self) -> TransactionTopicConfig {
298        self.config
299    }
300}
301
302/// Statistics for transaction topic emission.
303#[derive(Debug, Default)]
304pub struct TransactionTopicStats {
305    /// Total BEGIN events emitted
306    pub begin_events: AtomicU64,
307    /// Total END events emitted
308    pub end_events: AtomicU64,
309    /// Total events enriched with transaction context
310    pub events_enriched: AtomicU64,
311    /// Current in-flight transactions
312    pub in_flight_transactions: AtomicU64,
313    /// Transactions completed
314    pub transactions_completed: AtomicU64,
315    /// Transactions with missing BEGIN
316    pub orphan_ends: AtomicU64,
317}
318
319impl TransactionTopicStats {
320    /// Create a snapshot of current stats.
321    pub fn snapshot(&self) -> TransactionTopicStatsSnapshot {
322        TransactionTopicStatsSnapshot {
323            begin_events: self.begin_events.load(Ordering::Relaxed),
324            end_events: self.end_events.load(Ordering::Relaxed),
325            events_enriched: self.events_enriched.load(Ordering::Relaxed),
326            in_flight_transactions: self.in_flight_transactions.load(Ordering::Relaxed),
327            transactions_completed: self.transactions_completed.load(Ordering::Relaxed),
328            orphan_ends: self.orphan_ends.load(Ordering::Relaxed),
329        }
330    }
331}
332
333/// Snapshot of transaction topic stats.
334#[derive(Debug, Clone, Default)]
335pub struct TransactionTopicStatsSnapshot {
336    pub begin_events: u64,
337    pub end_events: u64,
338    pub events_enriched: u64,
339    pub in_flight_transactions: u64,
340    pub transactions_completed: u64,
341    pub orphan_ends: u64,
342}
343
344/// Emitter for transaction boundary events.
345///
346/// Tracks in-flight transactions and emits BEGIN/END events to dedicated topic.
347#[derive(Debug)]
348pub struct TransactionTopicEmitter {
349    config: TransactionTopicConfig,
350    /// In-flight transactions keyed by txn_id
351    in_flight: Arc<RwLock<HashMap<String, InFlightTransaction>>>,
352    /// Statistics
353    stats: Arc<TransactionTopicStats>,
354}
355
356impl TransactionTopicEmitter {
357    /// Create a new transaction topic emitter.
358    pub fn new(config: TransactionTopicConfig) -> Self {
359        Self {
360            config,
361            in_flight: Arc::new(RwLock::new(HashMap::new())),
362            stats: Arc::new(TransactionTopicStats::default()),
363        }
364    }
365
366    /// Get the transaction topic name.
367    pub fn topic_name(&self) -> String {
368        self.config.transaction_topic()
369    }
370
371    /// Get current statistics.
372    pub fn stats(&self) -> TransactionTopicStatsSnapshot {
373        self.stats.snapshot()
374    }
375
376    /// Register transaction begin.
377    ///
378    /// Returns BEGIN event if emission is enabled.
379    pub async fn begin_transaction(
380        &self,
381        txn_id: &str,
382        lsn: &str,
383        ts_ms: i64,
384    ) -> Option<TransactionEvent> {
385        let mut in_flight = self.in_flight.write().await;
386
387        // Check for limit
388        if in_flight.len() >= self.config.max_in_flight {
389            warn!(
390                txn_id = txn_id,
391                in_flight = in_flight.len(),
392                max = self.config.max_in_flight,
393                "High number of in-flight transactions"
394            );
395        }
396
397        // Create in-flight entry
398        let txn = InFlightTransaction::new(txn_id.to_string(), lsn.to_string(), ts_ms);
399        in_flight.insert(txn_id.to_string(), txn);
400
401        self.stats
402            .in_flight_transactions
403            .store(in_flight.len() as u64, Ordering::Relaxed);
404
405        if self.config.emit_begin {
406            self.stats.begin_events.fetch_add(1, Ordering::Relaxed);
407            debug!(txn_id = txn_id, lsn = lsn, "Transaction BEGIN");
408            Some(TransactionEvent::begin(txn_id, lsn, ts_ms))
409        } else {
410            None
411        }
412    }
413
414    /// Track an event within a transaction.
415    ///
416    /// Returns (total_order, data_collection_order) for event enrichment.
417    /// Returns None if transaction not found.
418    pub async fn track_event(
419        &self,
420        txn_id: &str,
421        data_collection: &str,
422        ts_ms: i64,
423    ) -> Option<TransactionContext> {
424        let mut in_flight = self.in_flight.write().await;
425
426        if let Some(txn) = in_flight.get_mut(txn_id) {
427            let (total_order, dc_order) = txn.track_event(data_collection, ts_ms);
428
429            if self.config.enrich_events {
430                self.stats.events_enriched.fetch_add(1, Ordering::Relaxed);
431                Some(TransactionContext::new(
432                    format!("{}:{}", txn_id, txn.lsn),
433                    total_order,
434                    dc_order,
435                ))
436            } else {
437                None
438            }
439        } else {
440            debug!(txn_id = txn_id, "Event for unknown transaction");
441            None
442        }
443    }
444
445    /// End a transaction and emit END event.
446    ///
447    /// Returns END event if emission is enabled.
448    pub async fn end_transaction(&self, txn_id: &str, ts_ms: i64) -> Option<TransactionEvent> {
449        let mut in_flight = self.in_flight.write().await;
450
451        if let Some(txn) = in_flight.remove(txn_id) {
452            self.stats
453                .in_flight_transactions
454                .store(in_flight.len() as u64, Ordering::Relaxed);
455            self.stats
456                .transactions_completed
457                .fetch_add(1, Ordering::Relaxed);
458
459            let duration_ms = txn.duration_ms();
460
461            if self.config.emit_end {
462                self.stats.end_events.fetch_add(1, Ordering::Relaxed);
463
464                // Build data collection counts
465                let data_collections: Vec<DataCollectionEventCount> = txn
466                    .data_collection_counts
467                    .into_iter()
468                    .map(|(dc, count)| DataCollectionEventCount {
469                        data_collection: dc,
470                        event_count: count,
471                    })
472                    .collect();
473
474                debug!(
475                    txn_id = txn_id,
476                    event_count = txn.total_event_count,
477                    tables = data_collections.len(),
478                    duration_ms = duration_ms,
479                    "Transaction END"
480                );
481
482                Some(TransactionEvent::end(
483                    &txn.txn_id,
484                    &txn.lsn,
485                    ts_ms,
486                    txn.total_event_count,
487                    data_collections,
488                ))
489            } else {
490                None
491            }
492        } else {
493            // Orphan END without BEGIN
494            self.stats.orphan_ends.fetch_add(1, Ordering::Relaxed);
495            warn!(txn_id = txn_id, "END for unknown transaction (orphan)");
496            None
497        }
498    }
499
500    /// Get number of in-flight transactions.
501    pub async fn in_flight_count(&self) -> usize {
502        self.in_flight.read().await.len()
503    }
504
505    /// Clear all in-flight transactions (e.g., on reconnect).
506    pub async fn clear_in_flight(&self) {
507        let mut in_flight = self.in_flight.write().await;
508        let count = in_flight.len();
509        in_flight.clear();
510        self.stats
511            .in_flight_transactions
512            .store(0, Ordering::Relaxed);
513
514        if count > 0 {
515            warn!(count = count, "Cleared in-flight transactions");
516        }
517    }
518
519    /// Enrich a CDC event with transaction context.
520    ///
521    /// Modifies the event's transaction metadata in place.
522    pub async fn enrich_event(&self, event: &mut CdcEvent) -> bool {
523        if !self.config.enrich_events {
524            return false;
525        }
526
527        // Get transaction ID from existing metadata
528        let txn_id = match &event.transaction {
529            Some(meta) => meta.id.clone(),
530            None => return false,
531        };
532
533        let data_collection = format!("{}.{}", event.schema, event.table);
534
535        if let Some(ctx) = self
536            .track_event(&txn_id, &data_collection, event.timestamp)
537            .await
538        {
539            // Update transaction metadata with order info
540            if let Some(ref mut meta) = event.transaction {
541                meta.id = ctx.id;
542                meta.sequence = ctx.total_order;
543            }
544            true
545        } else {
546            false
547        }
548    }
549}
550
551impl Clone for TransactionTopicEmitter {
552    fn clone(&self) -> Self {
553        Self {
554            config: self.config.clone(),
555            in_flight: Arc::clone(&self.in_flight),
556            stats: Arc::clone(&self.stats),
557        }
558    }
559}
560
561#[cfg(test)]
562mod tests {
563    use super::*;
564
565    #[test]
566    fn test_transaction_event_begin() {
567        let event = TransactionEvent::begin("555", "0/ABC123", 1705123456789);
568
569        assert_eq!(event.status, TransactionStatus::Begin);
570        assert_eq!(event.id, "555:0/ABC123");
571        assert_eq!(event.ts_ms, 1705123456789);
572        assert!(event.event_count.is_none());
573        assert!(event.data_collections.is_none());
574    }
575
576    #[test]
577    fn test_transaction_event_end() {
578        let data_collections = vec![
579            DataCollectionEventCount {
580                data_collection: "public.orders".to_string(),
581                event_count: 3,
582            },
583            DataCollectionEventCount {
584                data_collection: "public.customers".to_string(),
585                event_count: 2,
586            },
587        ];
588
589        let event = TransactionEvent::end("555", "0/ABC123", 1705123456999, 5, data_collections);
590
591        assert_eq!(event.status, TransactionStatus::End);
592        assert_eq!(event.id, "555:0/ABC123");
593        assert_eq!(event.event_count, Some(5));
594
595        let dcs = event.data_collections.unwrap();
596        assert_eq!(dcs.len(), 2);
597        assert_eq!(dcs[0].data_collection, "public.orders");
598        assert_eq!(dcs[0].event_count, 3);
599    }
600
601    #[test]
602    fn test_transaction_event_serialization() {
603        let event = TransactionEvent::begin("555", "0/ABC", 1705123456789);
604        let json = serde_json::to_string(&event).unwrap();
605
606        assert!(json.contains("\"status\":\"BEGIN\""));
607        assert!(json.contains("\"id\":\"555:0/ABC\""));
608        assert!(json.contains("\"ts_ms\":1705123456789"));
609        // Should not contain event_count or data_collections for BEGIN
610        assert!(!json.contains("event_count"));
611        assert!(!json.contains("data_collections"));
612    }
613
614    #[test]
615    fn test_transaction_event_end_serialization() {
616        let data_collections = vec![DataCollectionEventCount {
617            data_collection: "public.test".to_string(),
618            event_count: 10,
619        }];
620        let event = TransactionEvent::end("123", "0/FF", 1705123456999, 10, data_collections);
621        let json = serde_json::to_string(&event).unwrap();
622
623        assert!(json.contains("\"status\":\"END\""));
624        assert!(json.contains("\"event_count\":10"));
625        assert!(json.contains("\"data_collections\""));
626        assert!(json.contains("public.test"));
627    }
628
629    #[test]
630    fn test_topic_name() {
631        assert_eq!(
632            TransactionEvent::topic_name("fulfillment"),
633            "fulfillment.transaction"
634        );
635        assert_eq!(TransactionEvent::topic_name("mydb"), "mydb.transaction");
636    }
637
638    #[test]
639    fn test_config_builder() {
640        let config = TransactionTopicConfig::builder()
641            .topic_prefix("testdb")
642            .emit_begin(true)
643            .emit_end(true)
644            .enrich_events(false)
645            .max_in_flight(500)
646            .build();
647
648        assert_eq!(config.topic_prefix, "testdb");
649        assert!(config.emit_begin);
650        assert!(config.emit_end);
651        assert!(!config.enrich_events);
652        assert_eq!(config.max_in_flight, 500);
653        assert_eq!(config.transaction_topic(), "testdb.transaction");
654    }
655
656    #[tokio::test]
657    async fn test_emitter_full_transaction() {
658        let config = TransactionTopicConfig::builder()
659            .topic_prefix("test")
660            .emit_begin(true)
661            .emit_end(true)
662            .enrich_events(true)
663            .build();
664
665        let emitter = TransactionTopicEmitter::new(config);
666
667        // Begin transaction
668        let begin = emitter
669            .begin_transaction("txn-1", "0/100", 1000)
670            .await
671            .unwrap();
672        assert_eq!(begin.status, TransactionStatus::Begin);
673        assert_eq!(begin.id, "txn-1:0/100");
674
675        assert_eq!(emitter.in_flight_count().await, 1);
676
677        // Track events
678        let ctx1 = emitter
679            .track_event("txn-1", "public.orders", 1001)
680            .await
681            .unwrap();
682        assert_eq!(ctx1.total_order, 1);
683        assert_eq!(ctx1.data_collection_order, 1);
684
685        let ctx2 = emitter
686            .track_event("txn-1", "public.orders", 1002)
687            .await
688            .unwrap();
689        assert_eq!(ctx2.total_order, 2);
690        assert_eq!(ctx2.data_collection_order, 2);
691
692        let ctx3 = emitter
693            .track_event("txn-1", "public.customers", 1003)
694            .await
695            .unwrap();
696        assert_eq!(ctx3.total_order, 3);
697        assert_eq!(ctx3.data_collection_order, 1); // First event for this table
698
699        // End transaction
700        let end = emitter.end_transaction("txn-1", 1010).await.unwrap();
701        assert_eq!(end.status, TransactionStatus::End);
702        assert_eq!(end.event_count, Some(3));
703
704        let dcs = end.data_collections.unwrap();
705        assert_eq!(dcs.len(), 2);
706
707        // Find orders count
708        let orders_count = dcs
709            .iter()
710            .find(|dc| dc.data_collection == "public.orders")
711            .unwrap()
712            .event_count;
713        assert_eq!(orders_count, 2);
714
715        // Find customers count
716        let customers_count = dcs
717            .iter()
718            .find(|dc| dc.data_collection == "public.customers")
719            .unwrap()
720            .event_count;
721        assert_eq!(customers_count, 1);
722
723        // Should be cleared
724        assert_eq!(emitter.in_flight_count().await, 0);
725
726        // Check stats
727        let stats = emitter.stats();
728        assert_eq!(stats.begin_events, 1);
729        assert_eq!(stats.end_events, 1);
730        assert_eq!(stats.events_enriched, 3);
731        assert_eq!(stats.transactions_completed, 1);
732    }
733
734    #[tokio::test]
735    async fn test_emitter_orphan_end() {
736        let config = TransactionTopicConfig::builder()
737            .topic_prefix("test")
738            .emit_end(true)
739            .build();
740
741        let emitter = TransactionTopicEmitter::new(config);
742
743        // End without begin
744        let end = emitter.end_transaction("unknown", 1000).await;
745        assert!(end.is_none());
746
747        let stats = emitter.stats();
748        assert_eq!(stats.orphan_ends, 1);
749    }
750
751    #[tokio::test]
752    async fn test_emitter_no_enrichment() {
753        let config = TransactionTopicConfig::builder()
754            .topic_prefix("test")
755            .enrich_events(false)
756            .build();
757
758        let emitter = TransactionTopicEmitter::new(config);
759
760        emitter.begin_transaction("txn-1", "0/100", 1000).await;
761
762        // Track event but don't return context
763        let ctx = emitter.track_event("txn-1", "public.test", 1001).await;
764        assert!(ctx.is_none());
765
766        let stats = emitter.stats();
767        assert_eq!(stats.events_enriched, 0);
768    }
769
770    #[tokio::test]
771    async fn test_emitter_clear_in_flight() {
772        let config = TransactionTopicConfig::default();
773        let emitter = TransactionTopicEmitter::new(config);
774
775        emitter.begin_transaction("txn-1", "0/100", 1000).await;
776        emitter.begin_transaction("txn-2", "0/200", 1001).await;
777
778        assert_eq!(emitter.in_flight_count().await, 2);
779
780        emitter.clear_in_flight().await;
781
782        assert_eq!(emitter.in_flight_count().await, 0);
783    }
784
785    #[tokio::test]
786    async fn test_transaction_context() {
787        let ctx = TransactionContext::new("txn-1:0/ABC", 5, 3);
788
789        assert_eq!(ctx.id, "txn-1:0/ABC");
790        assert_eq!(ctx.total_order, 5);
791        assert_eq!(ctx.data_collection_order, 3);
792
793        // Serialize
794        let json = serde_json::to_string(&ctx).unwrap();
795        assert!(json.contains("\"id\":\"txn-1:0/ABC\""));
796        assert!(json.contains("\"total_order\":5"));
797        assert!(json.contains("\"data_collection_order\":3"));
798    }
799
800    #[test]
801    fn test_data_collection_event_count_serialization() {
802        let dc = DataCollectionEventCount {
803            data_collection: "inventory.products".to_string(),
804            event_count: 42,
805        };
806
807        let json = serde_json::to_string(&dc).unwrap();
808        assert!(json.contains("\"data_collection\":\"inventory.products\""));
809        assert!(json.contains("\"event_count\":42"));
810    }
811}