1use crate::error::StreamResult;
17use chrono::{DateTime, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21use tokio::sync::RwLock;
22use tracing::{debug, info, warn};
23use uuid::Uuid;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27pub enum CdcOperation {
28 Insert,
30 Update,
32 Delete,
34 Snapshot,
36 Truncate,
38 SchemaChange,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct CdcEvent {
45 pub id: Uuid,
47 pub source: CdcSource,
49 pub operation: CdcOperation,
51 pub before: Option<HashMap<String, serde_json::Value>>,
53 pub after: Option<HashMap<String, serde_json::Value>>,
55 pub transaction_id: Option<String>,
57 pub sequence: Option<u64>,
59 pub position: Option<String>,
61 pub timestamp: DateTime<Utc>,
63 pub schema_version: Option<u32>,
65 pub metadata: HashMap<String, String>,
67}
68
69#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
71pub struct CdcSource {
72 pub database: String,
74 pub schema: Option<String>,
76 pub table: String,
78 pub connector: CdcConnector,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
84pub enum CdcConnector {
85 Debezium,
87 Maxwell,
89 Canal,
91 AwsDms,
93 Custom,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct CdcConfig {
100 pub detect_transactions: bool,
102 pub transaction_buffer_size: usize,
104 pub transaction_timeout_ms: u64,
106 pub enable_deduplication: bool,
108 pub dedup_window_size: usize,
110 pub track_schema_evolution: bool,
112 pub enable_snapshot: bool,
114 pub snapshot_batch_size: usize,
116 pub enable_metrics: bool,
118}
119
120impl Default for CdcConfig {
121 fn default() -> Self {
122 Self {
123 detect_transactions: true,
124 transaction_buffer_size: 10000,
125 transaction_timeout_ms: 30000,
126 enable_deduplication: true,
127 dedup_window_size: 100000,
128 track_schema_evolution: true,
129 enable_snapshot: true,
130 snapshot_batch_size: 1000,
131 enable_metrics: true,
132 }
133 }
134}
135
136#[derive(Debug)]
138struct Transaction {
139 id: String,
140 events: Vec<CdcEvent>,
141 started_at: DateTime<Utc>,
142 last_event_at: DateTime<Utc>,
143}
144
145type DedupCacheEntry = (Uuid, DateTime<Utc>);
147
148pub struct CdcProcessor {
150 config: CdcConfig,
151 active_transactions: Arc<RwLock<HashMap<String, Transaction>>>,
153 dedup_cache: Arc<RwLock<VecDeque<DedupCacheEntry>>>,
155 schema_versions: Arc<RwLock<HashMap<CdcSource, u32>>>,
157 metrics: Arc<RwLock<CdcMetrics>>,
159}
160
161#[derive(Debug, Default, Clone, Serialize, Deserialize)]
163pub struct CdcMetrics {
164 pub events_processed: u64,
165 pub transactions_committed: u64,
166 pub transactions_rolled_back: u64,
167 pub deduplicated_events: u64,
168 pub schema_changes_detected: u64,
169 pub snapshot_events: u64,
170 pub inserts: u64,
171 pub updates: u64,
172 pub deletes: u64,
173 pub avg_transaction_size: f64,
174 pub max_transaction_size: usize,
175}
176
177impl CdcProcessor {
178 pub fn new(config: CdcConfig) -> Self {
180 Self {
181 config,
182 active_transactions: Arc::new(RwLock::new(HashMap::new())),
183 dedup_cache: Arc::new(RwLock::new(VecDeque::new())),
184 schema_versions: Arc::new(RwLock::new(HashMap::new())),
185 metrics: Arc::new(RwLock::new(CdcMetrics::default())),
186 }
187 }
188
189 pub async fn process_event(&self, event: CdcEvent) -> StreamResult<Vec<CdcEvent>> {
191 if self.config.enable_deduplication && self.is_duplicate(&event).await? {
193 let mut metrics = self.metrics.write().await;
194 metrics.deduplicated_events += 1;
195 debug!("Deduplicated CDC event: {}", event.id);
196 return Ok(vec![]);
197 }
198
199 if self.config.track_schema_evolution {
201 self.track_schema_version(&event).await?;
202 }
203
204 self.update_metrics(&event).await;
206
207 if self.config.detect_transactions && event.transaction_id.is_some() {
209 self.handle_transaction_event(event).await
210 } else {
211 Ok(vec![event])
213 }
214 }
215
216 async fn is_duplicate(&self, event: &CdcEvent) -> StreamResult<bool> {
218 let cache = self.dedup_cache.read().await;
219 Ok(cache.iter().any(|(id, _)| *id == event.id))
220 }
221
222 async fn track_schema_version(&self, event: &CdcEvent) -> StreamResult<()> {
224 if event.operation == CdcOperation::SchemaChange {
225 if let Some(version) = event.schema_version {
226 let mut versions = self.schema_versions.write().await;
227 let old_version = versions.insert(event.source.clone(), version);
228
229 if old_version != Some(version) {
230 info!(
231 "Schema version changed for {}.{}: {:?} -> {}",
232 event.source.database, event.source.table, old_version, version
233 );
234
235 let mut metrics = self.metrics.write().await;
236 metrics.schema_changes_detected += 1;
237 }
238 }
239 }
240 Ok(())
241 }
242
243 async fn handle_transaction_event(&self, event: CdcEvent) -> StreamResult<Vec<CdcEvent>> {
245 let tx_id = event
246 .transaction_id
247 .clone()
248 .expect("transaction_id should be present for transaction events");
249 let mut transactions = self.active_transactions.write().await;
250
251 let now = Utc::now();
252
253 let transaction = transactions
255 .entry(tx_id.clone())
256 .or_insert_with(|| Transaction {
257 id: tx_id.clone(),
258 events: Vec::new(),
259 started_at: now,
260 last_event_at: now,
261 });
262
263 transaction.events.push(event.clone());
264 transaction.last_event_at = now;
265
266 let timeout_ms = self.config.transaction_timeout_ms as i64;
268 if (now - transaction.started_at).num_milliseconds() > timeout_ms {
269 warn!(
270 "Transaction {} timed out after {} events",
271 tx_id,
272 transaction.events.len()
273 );
274
275 let events = transaction.events.clone();
277 transactions.remove(&tx_id);
278
279 let mut metrics = self.metrics.write().await;
280 let prev_count = metrics.transactions_committed;
281 metrics.transactions_committed += 1;
282 metrics.avg_transaction_size = (metrics.avg_transaction_size * prev_count as f64
283 + events.len() as f64)
284 / metrics.transactions_committed as f64;
285 metrics.max_transaction_size = metrics.max_transaction_size.max(events.len());
286
287 return Ok(events);
288 }
289
290 Ok(vec![])
292 }
293
294 pub async fn commit_transaction(&self, transaction_id: &str) -> StreamResult<Vec<CdcEvent>> {
296 let mut transactions = self.active_transactions.write().await;
297
298 if let Some(transaction) = transactions.remove(transaction_id) {
299 info!(
300 "Committing transaction {} with {} events",
301 transaction_id,
302 transaction.events.len()
303 );
304
305 let mut metrics = self.metrics.write().await;
306 let prev_count = metrics.transactions_committed;
307 metrics.transactions_committed += 1;
308 metrics.avg_transaction_size = (metrics.avg_transaction_size * prev_count as f64
309 + transaction.events.len() as f64)
310 / metrics.transactions_committed as f64;
311 metrics.max_transaction_size =
312 metrics.max_transaction_size.max(transaction.events.len());
313
314 Ok(transaction.events)
315 } else {
316 warn!(
317 "Attempted to commit unknown transaction: {}",
318 transaction_id
319 );
320 Ok(vec![])
321 }
322 }
323
324 pub async fn rollback_transaction(&self, transaction_id: &str) -> StreamResult<()> {
326 let mut transactions = self.active_transactions.write().await;
327
328 if let Some(transaction) = transactions.remove(transaction_id) {
329 warn!(
330 "Rolling back transaction {} with {} events",
331 transaction_id,
332 transaction.events.len()
333 );
334
335 let mut metrics = self.metrics.write().await;
336 metrics.transactions_rolled_back += 1;
337 }
338
339 Ok(())
340 }
341
342 async fn update_metrics(&self, event: &CdcEvent) {
344 let mut metrics = self.metrics.write().await;
345 metrics.events_processed += 1;
346
347 match event.operation {
348 CdcOperation::Insert => metrics.inserts += 1,
349 CdcOperation::Update => metrics.updates += 1,
350 CdcOperation::Delete => metrics.deletes += 1,
351 CdcOperation::Snapshot => metrics.snapshot_events += 1,
352 _ => {}
353 }
354
355 if self.config.enable_deduplication {
357 let mut cache = self.dedup_cache.write().await;
358 cache.push_back((event.id, event.timestamp));
359
360 while cache.len() > self.config.dedup_window_size {
362 cache.pop_front();
363 }
364 }
365 }
366
367 pub async fn get_metrics(&self) -> CdcMetrics {
369 self.metrics.read().await.clone()
370 }
371
372 pub fn to_custom_event_data(cdc_event: &CdcEvent) -> serde_json::Value {
377 serde_json::to_value(cdc_event).unwrap_or(serde_json::Value::Null)
378 }
379
380 pub fn from_json(data: &serde_json::Value) -> StreamResult<CdcEvent> {
382 serde_json::from_value(data.clone())
383 .map_err(|e| crate::error::StreamError::Deserialization(e.to_string()))
384 }
385}
386
387pub struct CdcEventBuilder {
389 event: CdcEvent,
390}
391
392impl CdcEventBuilder {
393 pub fn new(source: CdcSource, operation: CdcOperation) -> Self {
394 Self {
395 event: CdcEvent {
396 id: Uuid::new_v4(),
397 source,
398 operation,
399 before: None,
400 after: None,
401 transaction_id: None,
402 sequence: None,
403 position: None,
404 timestamp: Utc::now(),
405 schema_version: None,
406 metadata: HashMap::new(),
407 },
408 }
409 }
410
411 pub fn before(mut self, data: HashMap<String, serde_json::Value>) -> Self {
412 self.event.before = Some(data);
413 self
414 }
415
416 pub fn after(mut self, data: HashMap<String, serde_json::Value>) -> Self {
417 self.event.after = Some(data);
418 self
419 }
420
421 pub fn transaction(mut self, tx_id: String, sequence: u64) -> Self {
422 self.event.transaction_id = Some(tx_id);
423 self.event.sequence = Some(sequence);
424 self
425 }
426
427 pub fn position(mut self, pos: String) -> Self {
428 self.event.position = Some(pos);
429 self
430 }
431
432 pub fn schema_version(mut self, version: u32) -> Self {
433 self.event.schema_version = Some(version);
434 self
435 }
436
437 pub fn metadata(mut self, key: String, value: String) -> Self {
438 self.event.metadata.insert(key, value);
439 self
440 }
441
442 pub fn build(self) -> CdcEvent {
443 self.event
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450
451 fn create_test_source() -> CdcSource {
452 CdcSource {
453 database: "testdb".to_string(),
454 schema: Some("public".to_string()),
455 table: "users".to_string(),
456 connector: CdcConnector::Debezium,
457 }
458 }
459
460 #[tokio::test]
461 async fn test_cdc_processor_creation() {
462 let config = CdcConfig::default();
463 let processor = CdcProcessor::new(config);
464 let metrics = processor.get_metrics().await;
465 assert_eq!(metrics.events_processed, 0);
466 }
467
468 #[tokio::test]
469 async fn test_single_event_processing() {
470 let processor = CdcProcessor::new(CdcConfig::default());
471
472 let event = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
473 .after(HashMap::from([
474 ("id".to_string(), serde_json::json!(1)),
475 ("name".to_string(), serde_json::json!("Alice")),
476 ]))
477 .build();
478
479 let result = processor.process_event(event).await.unwrap();
480 assert_eq!(result.len(), 1);
481
482 let metrics = processor.get_metrics().await;
483 assert_eq!(metrics.events_processed, 1);
484 assert_eq!(metrics.inserts, 1);
485 }
486
487 #[tokio::test]
488 async fn test_transaction_assembly() {
489 let processor = CdcProcessor::new(CdcConfig::default());
490
491 let event1 = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
493 .transaction("tx123".to_string(), 1)
494 .after(HashMap::from([("id".to_string(), serde_json::json!(1))]))
495 .build();
496
497 let event2 = CdcEventBuilder::new(create_test_source(), CdcOperation::Update)
499 .transaction("tx123".to_string(), 2)
500 .before(HashMap::from([("id".to_string(), serde_json::json!(1))]))
501 .after(HashMap::from([
502 ("id".to_string(), serde_json::json!(1)),
503 ("status".to_string(), serde_json::json!("active")),
504 ]))
505 .build();
506
507 let result1 = processor.process_event(event1).await.unwrap();
509 let result2 = processor.process_event(event2).await.unwrap();
510
511 assert_eq!(result1.len(), 0); assert_eq!(result2.len(), 0); let committed = processor.commit_transaction("tx123").await.unwrap();
516 assert_eq!(committed.len(), 2);
517
518 let metrics = processor.get_metrics().await;
519 assert_eq!(metrics.transactions_committed, 1);
520 assert_eq!(metrics.avg_transaction_size, 2.0);
521 }
522
523 #[tokio::test]
524 async fn test_deduplication() {
525 let processor = CdcProcessor::new(CdcConfig {
526 enable_deduplication: true,
527 dedup_window_size: 100,
528 detect_transactions: false,
529 ..Default::default()
530 });
531
532 let event = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
533 .after(HashMap::from([("id".to_string(), serde_json::json!(1))]))
534 .build();
535
536 let result1 = processor.process_event(event.clone()).await.unwrap();
538 assert_eq!(result1.len(), 1);
539
540 let result2 = processor.process_event(event).await.unwrap();
542 assert_eq!(result2.len(), 0);
543
544 let metrics = processor.get_metrics().await;
545 assert_eq!(metrics.deduplicated_events, 1);
546 }
547
548 #[tokio::test]
549 async fn test_schema_version_tracking() {
550 let processor = CdcProcessor::new(CdcConfig {
551 track_schema_evolution: true,
552 ..Default::default()
553 });
554
555 let source = create_test_source();
556
557 let schema_event = CdcEventBuilder::new(source.clone(), CdcOperation::SchemaChange)
558 .schema_version(2)
559 .build();
560
561 processor.process_event(schema_event).await.unwrap();
562
563 let metrics = processor.get_metrics().await;
564 assert_eq!(metrics.schema_changes_detected, 1);
565 }
566
567 #[tokio::test]
568 async fn test_transaction_rollback() {
569 let processor = CdcProcessor::new(CdcConfig::default());
570
571 let event = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
572 .transaction("tx456".to_string(), 1)
573 .after(HashMap::from([("id".to_string(), serde_json::json!(1))]))
574 .build();
575
576 processor.process_event(event).await.unwrap();
577 processor.rollback_transaction("tx456").await.unwrap();
578
579 let metrics = processor.get_metrics().await;
580 assert_eq!(metrics.transactions_rolled_back, 1);
581 assert_eq!(metrics.transactions_committed, 0);
582 }
583
584 #[tokio::test]
585 async fn test_event_builder() {
586 let source = create_test_source();
587 let event = CdcEventBuilder::new(source.clone(), CdcOperation::Update)
588 .before(HashMap::from([(
589 "status".to_string(),
590 serde_json::json!("inactive"),
591 )]))
592 .after(HashMap::from([(
593 "status".to_string(),
594 serde_json::json!("active"),
595 )]))
596 .transaction("tx789".to_string(), 5)
597 .position("mysql-bin.000001:1234".to_string())
598 .schema_version(3)
599 .metadata("connector".to_string(), "debezium".to_string())
600 .build();
601
602 assert_eq!(event.source, source);
603 assert_eq!(event.operation, CdcOperation::Update);
604 assert!(event.before.is_some());
605 assert!(event.after.is_some());
606 assert_eq!(event.transaction_id, Some("tx789".to_string()));
607 assert_eq!(event.sequence, Some(5));
608 assert_eq!(event.position, Some("mysql-bin.000001:1234".to_string()));
609 assert_eq!(event.schema_version, Some(3));
610 assert_eq!(
611 event.metadata.get("connector"),
612 Some(&"debezium".to_string())
613 );
614 }
615
616 #[tokio::test]
617 async fn test_json_conversion() {
618 let cdc_event = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
619 .after(HashMap::from([("id".to_string(), serde_json::json!(1))]))
620 .build();
621
622 let json_data = CdcProcessor::to_custom_event_data(&cdc_event);
623 assert!(json_data.is_object());
624
625 let converted_back = CdcProcessor::from_json(&json_data).unwrap();
626 assert_eq!(converted_back.id, cdc_event.id);
627 assert_eq!(converted_back.operation, cdc_event.operation);
628 }
629}