Skip to main content

oxirs_stream/
cdc_processor.rs

1//! # Change Data Capture (CDC) Stream Processor
2//!
3//! Advanced CDC capabilities for streaming database changes, event sourcing,
4//! and real-time data integration with full support for inserts, updates, deletes,
5//! and schema evolution.
6//!
7//! ## Features
8//! - Multiple CDC patterns (Debezium, Maxwell, Canal compatibility)
9//! - Transaction boundary detection
10//! - Schema evolution tracking
11//! - Snapshot + incremental sync
12//! - Deduplication and idempotency
13//! - Backpressure handling for large transactions
14//! - Metrics and monitoring
15
16use crate::error::StreamResult;
17use chrono::{DateTime, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21use tokio::sync::RwLock;
22use tracing::{debug, info, warn};
23use uuid::Uuid;
24
25/// CDC operation types following industry standards
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27pub enum CdcOperation {
28    /// Insert/Create operation
29    Insert,
30    /// Update/Modify operation
31    Update,
32    /// Delete/Remove operation
33    Delete,
34    /// Snapshot read (initial state capture)
35    Snapshot,
36    /// Truncate table operation
37    Truncate,
38    /// Schema change (DDL)
39    SchemaChange,
40}
41
42/// CDC event representing a database change
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct CdcEvent {
45    /// Unique event identifier
46    pub id: Uuid,
47    /// Source database/table identifier
48    pub source: CdcSource,
49    /// Operation type
50    pub operation: CdcOperation,
51    /// Data before the change (for updates/deletes)
52    pub before: Option<HashMap<String, serde_json::Value>>,
53    /// Data after the change (for inserts/updates)
54    pub after: Option<HashMap<String, serde_json::Value>>,
55    /// Transaction identifier
56    pub transaction_id: Option<String>,
57    /// Transaction sequence number
58    pub sequence: Option<u64>,
59    /// Log position (LSN/binlog position/etc)
60    pub position: Option<String>,
61    /// Event timestamp
62    pub timestamp: DateTime<Utc>,
63    /// Schema version
64    pub schema_version: Option<u32>,
65    /// Metadata
66    pub metadata: HashMap<String, String>,
67}
68
69/// CDC source identifier
70#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
71pub struct CdcSource {
72    /// Database name
73    pub database: String,
74    /// Schema name (optional)
75    pub schema: Option<String>,
76    /// Table name
77    pub table: String,
78    /// Source connector type
79    pub connector: CdcConnector,
80}
81
82/// CDC connector types
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
84pub enum CdcConnector {
85    /// Debezium-compatible format
86    Debezium,
87    /// Maxwell format (MySQL)
88    Maxwell,
89    /// Canal format (MySQL)
90    Canal,
91    /// AWS DMS format
92    AwsDms,
93    /// Custom connector
94    Custom,
95}
96
97/// CDC processor configuration
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct CdcConfig {
100    /// Enable transaction boundary detection
101    pub detect_transactions: bool,
102    /// Buffer size for transaction assembly
103    pub transaction_buffer_size: usize,
104    /// Transaction timeout (commit incomplete transactions)
105    pub transaction_timeout_ms: u64,
106    /// Enable deduplication based on event ID
107    pub enable_deduplication: bool,
108    /// Deduplication window size
109    pub dedup_window_size: usize,
110    /// Enable schema evolution tracking
111    pub track_schema_evolution: bool,
112    /// Enable snapshot mode
113    pub enable_snapshot: bool,
114    /// Snapshot batch size
115    pub snapshot_batch_size: usize,
116    /// Enable metrics collection
117    pub enable_metrics: bool,
118}
119
120impl Default for CdcConfig {
121    fn default() -> Self {
122        Self {
123            detect_transactions: true,
124            transaction_buffer_size: 10000,
125            transaction_timeout_ms: 30000,
126            enable_deduplication: true,
127            dedup_window_size: 100000,
128            track_schema_evolution: true,
129            enable_snapshot: true,
130            snapshot_batch_size: 1000,
131            enable_metrics: true,
132        }
133    }
134}
135
136/// Transaction assembler for multi-event transactions
137#[derive(Debug)]
138struct Transaction {
139    id: String,
140    events: Vec<CdcEvent>,
141    started_at: DateTime<Utc>,
142    last_event_at: DateTime<Utc>,
143}
144
145/// Deduplication cache entry type (event ID, timestamp)
146type DedupCacheEntry = (Uuid, DateTime<Utc>);
147
148/// CDC processor with transaction handling and deduplication
149pub struct CdcProcessor {
150    config: CdcConfig,
151    /// Active transactions being assembled
152    active_transactions: Arc<RwLock<HashMap<String, Transaction>>>,
153    /// Deduplication cache (event ID -> timestamp)
154    dedup_cache: Arc<RwLock<VecDeque<DedupCacheEntry>>>,
155    /// Schema versions by source
156    schema_versions: Arc<RwLock<HashMap<CdcSource, u32>>>,
157    /// Processing metrics
158    metrics: Arc<RwLock<CdcMetrics>>,
159}
160
161/// CDC processing metrics
162#[derive(Debug, Default, Clone, Serialize, Deserialize)]
163pub struct CdcMetrics {
164    pub events_processed: u64,
165    pub transactions_committed: u64,
166    pub transactions_rolled_back: u64,
167    pub deduplicated_events: u64,
168    pub schema_changes_detected: u64,
169    pub snapshot_events: u64,
170    pub inserts: u64,
171    pub updates: u64,
172    pub deletes: u64,
173    pub avg_transaction_size: f64,
174    pub max_transaction_size: usize,
175}
176
177impl CdcProcessor {
178    /// Create a new CDC processor
179    pub fn new(config: CdcConfig) -> Self {
180        Self {
181            config,
182            active_transactions: Arc::new(RwLock::new(HashMap::new())),
183            dedup_cache: Arc::new(RwLock::new(VecDeque::new())),
184            schema_versions: Arc::new(RwLock::new(HashMap::new())),
185            metrics: Arc::new(RwLock::new(CdcMetrics::default())),
186        }
187    }
188
189    /// Process a CDC event
190    pub async fn process_event(&self, event: CdcEvent) -> StreamResult<Vec<CdcEvent>> {
191        // Deduplication check
192        if self.config.enable_deduplication && self.is_duplicate(&event).await? {
193            let mut metrics = self.metrics.write().await;
194            metrics.deduplicated_events += 1;
195            debug!("Deduplicated CDC event: {}", event.id);
196            return Ok(vec![]);
197        }
198
199        // Update schema version tracking
200        if self.config.track_schema_evolution {
201            self.track_schema_version(&event).await?;
202        }
203
204        // Update metrics
205        self.update_metrics(&event).await;
206
207        // Handle transaction boundaries
208        if self.config.detect_transactions && event.transaction_id.is_some() {
209            self.handle_transaction_event(event).await
210        } else {
211            // Non-transactional event - emit immediately
212            Ok(vec![event])
213        }
214    }
215
216    /// Check if event is a duplicate
217    async fn is_duplicate(&self, event: &CdcEvent) -> StreamResult<bool> {
218        let cache = self.dedup_cache.read().await;
219        Ok(cache.iter().any(|(id, _)| *id == event.id))
220    }
221
222    /// Track schema version changes
223    async fn track_schema_version(&self, event: &CdcEvent) -> StreamResult<()> {
224        if event.operation == CdcOperation::SchemaChange {
225            if let Some(version) = event.schema_version {
226                let mut versions = self.schema_versions.write().await;
227                let old_version = versions.insert(event.source.clone(), version);
228
229                if old_version != Some(version) {
230                    info!(
231                        "Schema version changed for {}.{}: {:?} -> {}",
232                        event.source.database, event.source.table, old_version, version
233                    );
234
235                    let mut metrics = self.metrics.write().await;
236                    metrics.schema_changes_detected += 1;
237                }
238            }
239        }
240        Ok(())
241    }
242
243    /// Handle transaction event assembly
244    async fn handle_transaction_event(&self, event: CdcEvent) -> StreamResult<Vec<CdcEvent>> {
245        let tx_id = event
246            .transaction_id
247            .clone()
248            .expect("transaction_id should be present for transaction events");
249        let mut transactions = self.active_transactions.write().await;
250
251        let now = Utc::now();
252
253        // Get or create transaction
254        let transaction = transactions
255            .entry(tx_id.clone())
256            .or_insert_with(|| Transaction {
257                id: tx_id.clone(),
258                events: Vec::new(),
259                started_at: now,
260                last_event_at: now,
261            });
262
263        transaction.events.push(event.clone());
264        transaction.last_event_at = now;
265
266        // Check for transaction timeout
267        let timeout_ms = self.config.transaction_timeout_ms as i64;
268        if (now - transaction.started_at).num_milliseconds() > timeout_ms {
269            warn!(
270                "Transaction {} timed out after {} events",
271                tx_id,
272                transaction.events.len()
273            );
274
275            // Commit incomplete transaction
276            let events = transaction.events.clone();
277            transactions.remove(&tx_id);
278
279            let mut metrics = self.metrics.write().await;
280            let prev_count = metrics.transactions_committed;
281            metrics.transactions_committed += 1;
282            metrics.avg_transaction_size = (metrics.avg_transaction_size * prev_count as f64
283                + events.len() as f64)
284                / metrics.transactions_committed as f64;
285            metrics.max_transaction_size = metrics.max_transaction_size.max(events.len());
286
287            return Ok(events);
288        }
289
290        // Transaction assembly continues
291        Ok(vec![])
292    }
293
294    /// Commit a transaction (call when transaction end marker received)
295    pub async fn commit_transaction(&self, transaction_id: &str) -> StreamResult<Vec<CdcEvent>> {
296        let mut transactions = self.active_transactions.write().await;
297
298        if let Some(transaction) = transactions.remove(transaction_id) {
299            info!(
300                "Committing transaction {} with {} events",
301                transaction_id,
302                transaction.events.len()
303            );
304
305            let mut metrics = self.metrics.write().await;
306            let prev_count = metrics.transactions_committed;
307            metrics.transactions_committed += 1;
308            metrics.avg_transaction_size = (metrics.avg_transaction_size * prev_count as f64
309                + transaction.events.len() as f64)
310                / metrics.transactions_committed as f64;
311            metrics.max_transaction_size =
312                metrics.max_transaction_size.max(transaction.events.len());
313
314            Ok(transaction.events)
315        } else {
316            warn!(
317                "Attempted to commit unknown transaction: {}",
318                transaction_id
319            );
320            Ok(vec![])
321        }
322    }
323
324    /// Rollback a transaction (discard events)
325    pub async fn rollback_transaction(&self, transaction_id: &str) -> StreamResult<()> {
326        let mut transactions = self.active_transactions.write().await;
327
328        if let Some(transaction) = transactions.remove(transaction_id) {
329            warn!(
330                "Rolling back transaction {} with {} events",
331                transaction_id,
332                transaction.events.len()
333            );
334
335            let mut metrics = self.metrics.write().await;
336            metrics.transactions_rolled_back += 1;
337        }
338
339        Ok(())
340    }
341
342    /// Update processing metrics
343    async fn update_metrics(&self, event: &CdcEvent) {
344        let mut metrics = self.metrics.write().await;
345        metrics.events_processed += 1;
346
347        match event.operation {
348            CdcOperation::Insert => metrics.inserts += 1,
349            CdcOperation::Update => metrics.updates += 1,
350            CdcOperation::Delete => metrics.deletes += 1,
351            CdcOperation::Snapshot => metrics.snapshot_events += 1,
352            _ => {}
353        }
354
355        // Maintain deduplication cache
356        if self.config.enable_deduplication {
357            let mut cache = self.dedup_cache.write().await;
358            cache.push_back((event.id, event.timestamp));
359
360            // Trim cache to configured size
361            while cache.len() > self.config.dedup_window_size {
362                cache.pop_front();
363            }
364        }
365    }
366
367    /// Get current processing metrics
368    pub async fn get_metrics(&self) -> CdcMetrics {
369        self.metrics.read().await.clone()
370    }
371
372    /// Convert CDC event to StreamEvent (Custom event variant)
373    /// Note: Since StreamEvent is an enum focused on RDF operations,
374    /// CDC events are best handled separately or through a custom event type.
375    /// This is a placeholder for potential future integration.
376    pub fn to_custom_event_data(cdc_event: &CdcEvent) -> serde_json::Value {
377        serde_json::to_value(cdc_event).unwrap_or(serde_json::Value::Null)
378    }
379
380    /// Parse CDC event from JSON data
381    pub fn from_json(data: &serde_json::Value) -> StreamResult<CdcEvent> {
382        serde_json::from_value(data.clone())
383            .map_err(|e| crate::error::StreamError::Deserialization(e.to_string()))
384    }
385}
386
387/// CDC event builder for convenient event construction
388pub struct CdcEventBuilder {
389    event: CdcEvent,
390}
391
392impl CdcEventBuilder {
393    pub fn new(source: CdcSource, operation: CdcOperation) -> Self {
394        Self {
395            event: CdcEvent {
396                id: Uuid::new_v4(),
397                source,
398                operation,
399                before: None,
400                after: None,
401                transaction_id: None,
402                sequence: None,
403                position: None,
404                timestamp: Utc::now(),
405                schema_version: None,
406                metadata: HashMap::new(),
407            },
408        }
409    }
410
411    pub fn before(mut self, data: HashMap<String, serde_json::Value>) -> Self {
412        self.event.before = Some(data);
413        self
414    }
415
416    pub fn after(mut self, data: HashMap<String, serde_json::Value>) -> Self {
417        self.event.after = Some(data);
418        self
419    }
420
421    pub fn transaction(mut self, tx_id: String, sequence: u64) -> Self {
422        self.event.transaction_id = Some(tx_id);
423        self.event.sequence = Some(sequence);
424        self
425    }
426
427    pub fn position(mut self, pos: String) -> Self {
428        self.event.position = Some(pos);
429        self
430    }
431
432    pub fn schema_version(mut self, version: u32) -> Self {
433        self.event.schema_version = Some(version);
434        self
435    }
436
437    pub fn metadata(mut self, key: String, value: String) -> Self {
438        self.event.metadata.insert(key, value);
439        self
440    }
441
442    pub fn build(self) -> CdcEvent {
443        self.event
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450
451    fn create_test_source() -> CdcSource {
452        CdcSource {
453            database: "testdb".to_string(),
454            schema: Some("public".to_string()),
455            table: "users".to_string(),
456            connector: CdcConnector::Debezium,
457        }
458    }
459
460    #[tokio::test]
461    async fn test_cdc_processor_creation() {
462        let config = CdcConfig::default();
463        let processor = CdcProcessor::new(config);
464        let metrics = processor.get_metrics().await;
465        assert_eq!(metrics.events_processed, 0);
466    }
467
468    #[tokio::test]
469    async fn test_single_event_processing() {
470        let processor = CdcProcessor::new(CdcConfig::default());
471
472        let event = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
473            .after(HashMap::from([
474                ("id".to_string(), serde_json::json!(1)),
475                ("name".to_string(), serde_json::json!("Alice")),
476            ]))
477            .build();
478
479        let result = processor.process_event(event).await.unwrap();
480        assert_eq!(result.len(), 1);
481
482        let metrics = processor.get_metrics().await;
483        assert_eq!(metrics.events_processed, 1);
484        assert_eq!(metrics.inserts, 1);
485    }
486
487    #[tokio::test]
488    async fn test_transaction_assembly() {
489        let processor = CdcProcessor::new(CdcConfig::default());
490
491        // Event 1 in transaction
492        let event1 = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
493            .transaction("tx123".to_string(), 1)
494            .after(HashMap::from([("id".to_string(), serde_json::json!(1))]))
495            .build();
496
497        // Event 2 in same transaction
498        let event2 = CdcEventBuilder::new(create_test_source(), CdcOperation::Update)
499            .transaction("tx123".to_string(), 2)
500            .before(HashMap::from([("id".to_string(), serde_json::json!(1))]))
501            .after(HashMap::from([
502                ("id".to_string(), serde_json::json!(1)),
503                ("status".to_string(), serde_json::json!("active")),
504            ]))
505            .build();
506
507        // Process events - should buffer them
508        let result1 = processor.process_event(event1).await.unwrap();
509        let result2 = processor.process_event(event2).await.unwrap();
510
511        assert_eq!(result1.len(), 0); // Buffered
512        assert_eq!(result2.len(), 0); // Buffered
513
514        // Commit transaction
515        let committed = processor.commit_transaction("tx123").await.unwrap();
516        assert_eq!(committed.len(), 2);
517
518        let metrics = processor.get_metrics().await;
519        assert_eq!(metrics.transactions_committed, 1);
520        assert_eq!(metrics.avg_transaction_size, 2.0);
521    }
522
523    #[tokio::test]
524    async fn test_deduplication() {
525        let processor = CdcProcessor::new(CdcConfig {
526            enable_deduplication: true,
527            dedup_window_size: 100,
528            detect_transactions: false,
529            ..Default::default()
530        });
531
532        let event = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
533            .after(HashMap::from([("id".to_string(), serde_json::json!(1))]))
534            .build();
535
536        // First processing
537        let result1 = processor.process_event(event.clone()).await.unwrap();
538        assert_eq!(result1.len(), 1);
539
540        // Duplicate - should be filtered
541        let result2 = processor.process_event(event).await.unwrap();
542        assert_eq!(result2.len(), 0);
543
544        let metrics = processor.get_metrics().await;
545        assert_eq!(metrics.deduplicated_events, 1);
546    }
547
548    #[tokio::test]
549    async fn test_schema_version_tracking() {
550        let processor = CdcProcessor::new(CdcConfig {
551            track_schema_evolution: true,
552            ..Default::default()
553        });
554
555        let source = create_test_source();
556
557        let schema_event = CdcEventBuilder::new(source.clone(), CdcOperation::SchemaChange)
558            .schema_version(2)
559            .build();
560
561        processor.process_event(schema_event).await.unwrap();
562
563        let metrics = processor.get_metrics().await;
564        assert_eq!(metrics.schema_changes_detected, 1);
565    }
566
567    #[tokio::test]
568    async fn test_transaction_rollback() {
569        let processor = CdcProcessor::new(CdcConfig::default());
570
571        let event = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
572            .transaction("tx456".to_string(), 1)
573            .after(HashMap::from([("id".to_string(), serde_json::json!(1))]))
574            .build();
575
576        processor.process_event(event).await.unwrap();
577        processor.rollback_transaction("tx456").await.unwrap();
578
579        let metrics = processor.get_metrics().await;
580        assert_eq!(metrics.transactions_rolled_back, 1);
581        assert_eq!(metrics.transactions_committed, 0);
582    }
583
584    #[tokio::test]
585    async fn test_event_builder() {
586        let source = create_test_source();
587        let event = CdcEventBuilder::new(source.clone(), CdcOperation::Update)
588            .before(HashMap::from([(
589                "status".to_string(),
590                serde_json::json!("inactive"),
591            )]))
592            .after(HashMap::from([(
593                "status".to_string(),
594                serde_json::json!("active"),
595            )]))
596            .transaction("tx789".to_string(), 5)
597            .position("mysql-bin.000001:1234".to_string())
598            .schema_version(3)
599            .metadata("connector".to_string(), "debezium".to_string())
600            .build();
601
602        assert_eq!(event.source, source);
603        assert_eq!(event.operation, CdcOperation::Update);
604        assert!(event.before.is_some());
605        assert!(event.after.is_some());
606        assert_eq!(event.transaction_id, Some("tx789".to_string()));
607        assert_eq!(event.sequence, Some(5));
608        assert_eq!(event.position, Some("mysql-bin.000001:1234".to_string()));
609        assert_eq!(event.schema_version, Some(3));
610        assert_eq!(
611            event.metadata.get("connector"),
612            Some(&"debezium".to_string())
613        );
614    }
615
616    #[tokio::test]
617    async fn test_json_conversion() {
618        let cdc_event = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
619            .after(HashMap::from([("id".to_string(), serde_json::json!(1))]))
620            .build();
621
622        let json_data = CdcProcessor::to_custom_event_data(&cdc_event);
623        assert!(json_data.is_object());
624
625        let converted_back = CdcProcessor::from_json(&json_data).unwrap();
626        assert_eq!(converted_back.id, cdc_event.id);
627        assert_eq!(converted_back.operation, cdc_event.operation);
628    }
629}