1use crate::error::StreamResult;
17use chrono::{DateTime, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21use tokio::sync::RwLock;
22use tracing::{debug, info, warn};
23use uuid::Uuid;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27pub enum CdcOperation {
28 Insert,
30 Update,
32 Delete,
34 Snapshot,
36 Truncate,
38 SchemaChange,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct CdcEvent {
45 pub id: Uuid,
47 pub source: CdcSource,
49 pub operation: CdcOperation,
51 pub before: Option<HashMap<String, serde_json::Value>>,
53 pub after: Option<HashMap<String, serde_json::Value>>,
55 pub transaction_id: Option<String>,
57 pub sequence: Option<u64>,
59 pub position: Option<String>,
61 pub timestamp: DateTime<Utc>,
63 pub schema_version: Option<u32>,
65 pub metadata: HashMap<String, String>,
67}
68
69#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
71pub struct CdcSource {
72 pub database: String,
74 pub schema: Option<String>,
76 pub table: String,
78 pub connector: CdcConnector,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
84pub enum CdcConnector {
85 Debezium,
87 Maxwell,
89 Canal,
91 AwsDms,
93 Custom,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct CdcConfig {
100 pub detect_transactions: bool,
102 pub transaction_buffer_size: usize,
104 pub transaction_timeout_ms: u64,
106 pub enable_deduplication: bool,
108 pub dedup_window_size: usize,
110 pub track_schema_evolution: bool,
112 pub enable_snapshot: bool,
114 pub snapshot_batch_size: usize,
116 pub enable_metrics: bool,
118}
119
120impl Default for CdcConfig {
121 fn default() -> Self {
122 Self {
123 detect_transactions: true,
124 transaction_buffer_size: 10000,
125 transaction_timeout_ms: 30000,
126 enable_deduplication: true,
127 dedup_window_size: 100000,
128 track_schema_evolution: true,
129 enable_snapshot: true,
130 snapshot_batch_size: 1000,
131 enable_metrics: true,
132 }
133 }
134}
135
136#[derive(Debug)]
138struct Transaction {
139 id: String,
140 events: Vec<CdcEvent>,
141 started_at: DateTime<Utc>,
142 last_event_at: DateTime<Utc>,
143}
144
145type DedupCacheEntry = (Uuid, DateTime<Utc>);
147
148pub struct CdcProcessor {
150 config: CdcConfig,
151 active_transactions: Arc<RwLock<HashMap<String, Transaction>>>,
153 dedup_cache: Arc<RwLock<VecDeque<DedupCacheEntry>>>,
155 schema_versions: Arc<RwLock<HashMap<CdcSource, u32>>>,
157 metrics: Arc<RwLock<CdcMetrics>>,
159}
160
161#[derive(Debug, Default, Clone, Serialize, Deserialize)]
163pub struct CdcMetrics {
164 pub events_processed: u64,
165 pub transactions_committed: u64,
166 pub transactions_rolled_back: u64,
167 pub deduplicated_events: u64,
168 pub schema_changes_detected: u64,
169 pub snapshot_events: u64,
170 pub inserts: u64,
171 pub updates: u64,
172 pub deletes: u64,
173 pub avg_transaction_size: f64,
174 pub max_transaction_size: usize,
175}
176
177impl CdcProcessor {
178 pub fn new(config: CdcConfig) -> Self {
180 Self {
181 config,
182 active_transactions: Arc::new(RwLock::new(HashMap::new())),
183 dedup_cache: Arc::new(RwLock::new(VecDeque::new())),
184 schema_versions: Arc::new(RwLock::new(HashMap::new())),
185 metrics: Arc::new(RwLock::new(CdcMetrics::default())),
186 }
187 }
188
189 pub async fn process_event(&self, event: CdcEvent) -> StreamResult<Vec<CdcEvent>> {
191 if self.config.enable_deduplication && self.is_duplicate(&event).await? {
193 let mut metrics = self.metrics.write().await;
194 metrics.deduplicated_events += 1;
195 debug!("Deduplicated CDC event: {}", event.id);
196 return Ok(vec![]);
197 }
198
199 if self.config.track_schema_evolution {
201 self.track_schema_version(&event).await?;
202 }
203
204 self.update_metrics(&event).await;
206
207 if self.config.detect_transactions && event.transaction_id.is_some() {
209 self.handle_transaction_event(event).await
210 } else {
211 Ok(vec![event])
213 }
214 }
215
216 async fn is_duplicate(&self, event: &CdcEvent) -> StreamResult<bool> {
218 let cache = self.dedup_cache.read().await;
219 Ok(cache.iter().any(|(id, _)| *id == event.id))
220 }
221
222 async fn track_schema_version(&self, event: &CdcEvent) -> StreamResult<()> {
224 if event.operation == CdcOperation::SchemaChange {
225 if let Some(version) = event.schema_version {
226 let mut versions = self.schema_versions.write().await;
227 let old_version = versions.insert(event.source.clone(), version);
228
229 if old_version != Some(version) {
230 info!(
231 "Schema version changed for {}.{}: {:?} -> {}",
232 event.source.database, event.source.table, old_version, version
233 );
234
235 let mut metrics = self.metrics.write().await;
236 metrics.schema_changes_detected += 1;
237 }
238 }
239 }
240 Ok(())
241 }
242
243 async fn handle_transaction_event(&self, event: CdcEvent) -> StreamResult<Vec<CdcEvent>> {
245 let tx_id = event.transaction_id.clone().unwrap();
246 let mut transactions = self.active_transactions.write().await;
247
248 let now = Utc::now();
249
250 let transaction = transactions
252 .entry(tx_id.clone())
253 .or_insert_with(|| Transaction {
254 id: tx_id.clone(),
255 events: Vec::new(),
256 started_at: now,
257 last_event_at: now,
258 });
259
260 transaction.events.push(event.clone());
261 transaction.last_event_at = now;
262
263 let timeout_ms = self.config.transaction_timeout_ms as i64;
265 if (now - transaction.started_at).num_milliseconds() > timeout_ms {
266 warn!(
267 "Transaction {} timed out after {} events",
268 tx_id,
269 transaction.events.len()
270 );
271
272 let events = transaction.events.clone();
274 transactions.remove(&tx_id);
275
276 let mut metrics = self.metrics.write().await;
277 let prev_count = metrics.transactions_committed;
278 metrics.transactions_committed += 1;
279 metrics.avg_transaction_size = (metrics.avg_transaction_size * prev_count as f64
280 + events.len() as f64)
281 / metrics.transactions_committed as f64;
282 metrics.max_transaction_size = metrics.max_transaction_size.max(events.len());
283
284 return Ok(events);
285 }
286
287 Ok(vec![])
289 }
290
291 pub async fn commit_transaction(&self, transaction_id: &str) -> StreamResult<Vec<CdcEvent>> {
293 let mut transactions = self.active_transactions.write().await;
294
295 if let Some(transaction) = transactions.remove(transaction_id) {
296 info!(
297 "Committing transaction {} with {} events",
298 transaction_id,
299 transaction.events.len()
300 );
301
302 let mut metrics = self.metrics.write().await;
303 let prev_count = metrics.transactions_committed;
304 metrics.transactions_committed += 1;
305 metrics.avg_transaction_size = (metrics.avg_transaction_size * prev_count as f64
306 + transaction.events.len() as f64)
307 / metrics.transactions_committed as f64;
308 metrics.max_transaction_size =
309 metrics.max_transaction_size.max(transaction.events.len());
310
311 Ok(transaction.events)
312 } else {
313 warn!(
314 "Attempted to commit unknown transaction: {}",
315 transaction_id
316 );
317 Ok(vec![])
318 }
319 }
320
321 pub async fn rollback_transaction(&self, transaction_id: &str) -> StreamResult<()> {
323 let mut transactions = self.active_transactions.write().await;
324
325 if let Some(transaction) = transactions.remove(transaction_id) {
326 warn!(
327 "Rolling back transaction {} with {} events",
328 transaction_id,
329 transaction.events.len()
330 );
331
332 let mut metrics = self.metrics.write().await;
333 metrics.transactions_rolled_back += 1;
334 }
335
336 Ok(())
337 }
338
339 async fn update_metrics(&self, event: &CdcEvent) {
341 let mut metrics = self.metrics.write().await;
342 metrics.events_processed += 1;
343
344 match event.operation {
345 CdcOperation::Insert => metrics.inserts += 1,
346 CdcOperation::Update => metrics.updates += 1,
347 CdcOperation::Delete => metrics.deletes += 1,
348 CdcOperation::Snapshot => metrics.snapshot_events += 1,
349 _ => {}
350 }
351
352 if self.config.enable_deduplication {
354 let mut cache = self.dedup_cache.write().await;
355 cache.push_back((event.id, event.timestamp));
356
357 while cache.len() > self.config.dedup_window_size {
359 cache.pop_front();
360 }
361 }
362 }
363
364 pub async fn get_metrics(&self) -> CdcMetrics {
366 self.metrics.read().await.clone()
367 }
368
369 pub fn to_custom_event_data(cdc_event: &CdcEvent) -> serde_json::Value {
374 serde_json::to_value(cdc_event).unwrap_or(serde_json::Value::Null)
375 }
376
377 pub fn from_json(data: &serde_json::Value) -> StreamResult<CdcEvent> {
379 serde_json::from_value(data.clone())
380 .map_err(|e| crate::error::StreamError::Deserialization(e.to_string()))
381 }
382}
383
384pub struct CdcEventBuilder {
386 event: CdcEvent,
387}
388
389impl CdcEventBuilder {
390 pub fn new(source: CdcSource, operation: CdcOperation) -> Self {
391 Self {
392 event: CdcEvent {
393 id: Uuid::new_v4(),
394 source,
395 operation,
396 before: None,
397 after: None,
398 transaction_id: None,
399 sequence: None,
400 position: None,
401 timestamp: Utc::now(),
402 schema_version: None,
403 metadata: HashMap::new(),
404 },
405 }
406 }
407
408 pub fn before(mut self, data: HashMap<String, serde_json::Value>) -> Self {
409 self.event.before = Some(data);
410 self
411 }
412
413 pub fn after(mut self, data: HashMap<String, serde_json::Value>) -> Self {
414 self.event.after = Some(data);
415 self
416 }
417
418 pub fn transaction(mut self, tx_id: String, sequence: u64) -> Self {
419 self.event.transaction_id = Some(tx_id);
420 self.event.sequence = Some(sequence);
421 self
422 }
423
424 pub fn position(mut self, pos: String) -> Self {
425 self.event.position = Some(pos);
426 self
427 }
428
429 pub fn schema_version(mut self, version: u32) -> Self {
430 self.event.schema_version = Some(version);
431 self
432 }
433
434 pub fn metadata(mut self, key: String, value: String) -> Self {
435 self.event.metadata.insert(key, value);
436 self
437 }
438
439 pub fn build(self) -> CdcEvent {
440 self.event
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447
448 fn create_test_source() -> CdcSource {
449 CdcSource {
450 database: "testdb".to_string(),
451 schema: Some("public".to_string()),
452 table: "users".to_string(),
453 connector: CdcConnector::Debezium,
454 }
455 }
456
457 #[tokio::test]
458 async fn test_cdc_processor_creation() {
459 let config = CdcConfig::default();
460 let processor = CdcProcessor::new(config);
461 let metrics = processor.get_metrics().await;
462 assert_eq!(metrics.events_processed, 0);
463 }
464
465 #[tokio::test]
466 async fn test_single_event_processing() {
467 let processor = CdcProcessor::new(CdcConfig::default());
468
469 let event = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
470 .after(HashMap::from([
471 ("id".to_string(), serde_json::json!(1)),
472 ("name".to_string(), serde_json::json!("Alice")),
473 ]))
474 .build();
475
476 let result = processor.process_event(event).await.unwrap();
477 assert_eq!(result.len(), 1);
478
479 let metrics = processor.get_metrics().await;
480 assert_eq!(metrics.events_processed, 1);
481 assert_eq!(metrics.inserts, 1);
482 }
483
484 #[tokio::test]
485 async fn test_transaction_assembly() {
486 let processor = CdcProcessor::new(CdcConfig::default());
487
488 let event1 = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
490 .transaction("tx123".to_string(), 1)
491 .after(HashMap::from([("id".to_string(), serde_json::json!(1))]))
492 .build();
493
494 let event2 = CdcEventBuilder::new(create_test_source(), CdcOperation::Update)
496 .transaction("tx123".to_string(), 2)
497 .before(HashMap::from([("id".to_string(), serde_json::json!(1))]))
498 .after(HashMap::from([
499 ("id".to_string(), serde_json::json!(1)),
500 ("status".to_string(), serde_json::json!("active")),
501 ]))
502 .build();
503
504 let result1 = processor.process_event(event1).await.unwrap();
506 let result2 = processor.process_event(event2).await.unwrap();
507
508 assert_eq!(result1.len(), 0); assert_eq!(result2.len(), 0); let committed = processor.commit_transaction("tx123").await.unwrap();
513 assert_eq!(committed.len(), 2);
514
515 let metrics = processor.get_metrics().await;
516 assert_eq!(metrics.transactions_committed, 1);
517 assert_eq!(metrics.avg_transaction_size, 2.0);
518 }
519
520 #[tokio::test]
521 async fn test_deduplication() {
522 let processor = CdcProcessor::new(CdcConfig {
523 enable_deduplication: true,
524 dedup_window_size: 100,
525 detect_transactions: false,
526 ..Default::default()
527 });
528
529 let event = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
530 .after(HashMap::from([("id".to_string(), serde_json::json!(1))]))
531 .build();
532
533 let result1 = processor.process_event(event.clone()).await.unwrap();
535 assert_eq!(result1.len(), 1);
536
537 let result2 = processor.process_event(event).await.unwrap();
539 assert_eq!(result2.len(), 0);
540
541 let metrics = processor.get_metrics().await;
542 assert_eq!(metrics.deduplicated_events, 1);
543 }
544
545 #[tokio::test]
546 async fn test_schema_version_tracking() {
547 let processor = CdcProcessor::new(CdcConfig {
548 track_schema_evolution: true,
549 ..Default::default()
550 });
551
552 let source = create_test_source();
553
554 let schema_event = CdcEventBuilder::new(source.clone(), CdcOperation::SchemaChange)
555 .schema_version(2)
556 .build();
557
558 processor.process_event(schema_event).await.unwrap();
559
560 let metrics = processor.get_metrics().await;
561 assert_eq!(metrics.schema_changes_detected, 1);
562 }
563
564 #[tokio::test]
565 async fn test_transaction_rollback() {
566 let processor = CdcProcessor::new(CdcConfig::default());
567
568 let event = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
569 .transaction("tx456".to_string(), 1)
570 .after(HashMap::from([("id".to_string(), serde_json::json!(1))]))
571 .build();
572
573 processor.process_event(event).await.unwrap();
574 processor.rollback_transaction("tx456").await.unwrap();
575
576 let metrics = processor.get_metrics().await;
577 assert_eq!(metrics.transactions_rolled_back, 1);
578 assert_eq!(metrics.transactions_committed, 0);
579 }
580
581 #[tokio::test]
582 async fn test_event_builder() {
583 let source = create_test_source();
584 let event = CdcEventBuilder::new(source.clone(), CdcOperation::Update)
585 .before(HashMap::from([(
586 "status".to_string(),
587 serde_json::json!("inactive"),
588 )]))
589 .after(HashMap::from([(
590 "status".to_string(),
591 serde_json::json!("active"),
592 )]))
593 .transaction("tx789".to_string(), 5)
594 .position("mysql-bin.000001:1234".to_string())
595 .schema_version(3)
596 .metadata("connector".to_string(), "debezium".to_string())
597 .build();
598
599 assert_eq!(event.source, source);
600 assert_eq!(event.operation, CdcOperation::Update);
601 assert!(event.before.is_some());
602 assert!(event.after.is_some());
603 assert_eq!(event.transaction_id, Some("tx789".to_string()));
604 assert_eq!(event.sequence, Some(5));
605 assert_eq!(event.position, Some("mysql-bin.000001:1234".to_string()));
606 assert_eq!(event.schema_version, Some(3));
607 assert_eq!(
608 event.metadata.get("connector"),
609 Some(&"debezium".to_string())
610 );
611 }
612
613 #[tokio::test]
614 async fn test_json_conversion() {
615 let cdc_event = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
616 .after(HashMap::from([("id".to_string(), serde_json::json!(1))]))
617 .build();
618
619 let json_data = CdcProcessor::to_custom_event_data(&cdc_event);
620 assert!(json_data.is_object());
621
622 let converted_back = CdcProcessor::from_json(&json_data).unwrap();
623 assert_eq!(converted_back.id, cdc_event.id);
624 assert_eq!(converted_back.operation, cdc_event.operation);
625 }
626}