use crate::error::StreamResult;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum CdcOperation {
Insert,
Update,
Delete,
Snapshot,
Truncate,
SchemaChange,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CdcEvent {
pub id: Uuid,
pub source: CdcSource,
pub operation: CdcOperation,
pub before: Option<HashMap<String, serde_json::Value>>,
pub after: Option<HashMap<String, serde_json::Value>>,
pub transaction_id: Option<String>,
pub sequence: Option<u64>,
pub position: Option<String>,
pub timestamp: DateTime<Utc>,
pub schema_version: Option<u32>,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct CdcSource {
pub database: String,
pub schema: Option<String>,
pub table: String,
pub connector: CdcConnector,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum CdcConnector {
Debezium,
Maxwell,
Canal,
AwsDms,
Custom,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CdcConfig {
pub detect_transactions: bool,
pub transaction_buffer_size: usize,
pub transaction_timeout_ms: u64,
pub enable_deduplication: bool,
pub dedup_window_size: usize,
pub track_schema_evolution: bool,
pub enable_snapshot: bool,
pub snapshot_batch_size: usize,
pub enable_metrics: bool,
}
impl Default for CdcConfig {
fn default() -> Self {
Self {
detect_transactions: true,
transaction_buffer_size: 10000,
transaction_timeout_ms: 30000,
enable_deduplication: true,
dedup_window_size: 100000,
track_schema_evolution: true,
enable_snapshot: true,
snapshot_batch_size: 1000,
enable_metrics: true,
}
}
}
#[derive(Debug)]
struct Transaction {
id: String,
events: Vec<CdcEvent>,
started_at: DateTime<Utc>,
last_event_at: DateTime<Utc>,
}
type DedupCacheEntry = (Uuid, DateTime<Utc>);
pub struct CdcProcessor {
config: CdcConfig,
active_transactions: Arc<RwLock<HashMap<String, Transaction>>>,
dedup_cache: Arc<RwLock<VecDeque<DedupCacheEntry>>>,
schema_versions: Arc<RwLock<HashMap<CdcSource, u32>>>,
metrics: Arc<RwLock<CdcMetrics>>,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct CdcMetrics {
pub events_processed: u64,
pub transactions_committed: u64,
pub transactions_rolled_back: u64,
pub deduplicated_events: u64,
pub schema_changes_detected: u64,
pub snapshot_events: u64,
pub inserts: u64,
pub updates: u64,
pub deletes: u64,
pub avg_transaction_size: f64,
pub max_transaction_size: usize,
}
impl CdcProcessor {
pub fn new(config: CdcConfig) -> Self {
Self {
config,
active_transactions: Arc::new(RwLock::new(HashMap::new())),
dedup_cache: Arc::new(RwLock::new(VecDeque::new())),
schema_versions: Arc::new(RwLock::new(HashMap::new())),
metrics: Arc::new(RwLock::new(CdcMetrics::default())),
}
}
pub async fn process_event(&self, event: CdcEvent) -> StreamResult<Vec<CdcEvent>> {
if self.config.enable_deduplication && self.is_duplicate(&event).await? {
let mut metrics = self.metrics.write().await;
metrics.deduplicated_events += 1;
debug!("Deduplicated CDC event: {}", event.id);
return Ok(vec![]);
}
if self.config.track_schema_evolution {
self.track_schema_version(&event).await?;
}
self.update_metrics(&event).await;
if self.config.detect_transactions && event.transaction_id.is_some() {
self.handle_transaction_event(event).await
} else {
Ok(vec![event])
}
}
async fn is_duplicate(&self, event: &CdcEvent) -> StreamResult<bool> {
let cache = self.dedup_cache.read().await;
Ok(cache.iter().any(|(id, _)| *id == event.id))
}
async fn track_schema_version(&self, event: &CdcEvent) -> StreamResult<()> {
if event.operation == CdcOperation::SchemaChange {
if let Some(version) = event.schema_version {
let mut versions = self.schema_versions.write().await;
let old_version = versions.insert(event.source.clone(), version);
if old_version != Some(version) {
info!(
"Schema version changed for {}.{}: {:?} -> {}",
event.source.database, event.source.table, old_version, version
);
let mut metrics = self.metrics.write().await;
metrics.schema_changes_detected += 1;
}
}
}
Ok(())
}
async fn handle_transaction_event(&self, event: CdcEvent) -> StreamResult<Vec<CdcEvent>> {
let tx_id = event
.transaction_id
.clone()
.expect("transaction_id should be present for transaction events");
let mut transactions = self.active_transactions.write().await;
let now = Utc::now();
let transaction = transactions
.entry(tx_id.clone())
.or_insert_with(|| Transaction {
id: tx_id.clone(),
events: Vec::new(),
started_at: now,
last_event_at: now,
});
transaction.events.push(event.clone());
transaction.last_event_at = now;
let timeout_ms = self.config.transaction_timeout_ms as i64;
if (now - transaction.started_at).num_milliseconds() > timeout_ms {
warn!(
"Transaction {} timed out after {} events",
tx_id,
transaction.events.len()
);
let events = transaction.events.clone();
transactions.remove(&tx_id);
let mut metrics = self.metrics.write().await;
let prev_count = metrics.transactions_committed;
metrics.transactions_committed += 1;
metrics.avg_transaction_size = (metrics.avg_transaction_size * prev_count as f64
+ events.len() as f64)
/ metrics.transactions_committed as f64;
metrics.max_transaction_size = metrics.max_transaction_size.max(events.len());
return Ok(events);
}
Ok(vec![])
}
pub async fn commit_transaction(&self, transaction_id: &str) -> StreamResult<Vec<CdcEvent>> {
let mut transactions = self.active_transactions.write().await;
if let Some(transaction) = transactions.remove(transaction_id) {
info!(
"Committing transaction {} with {} events",
transaction_id,
transaction.events.len()
);
let mut metrics = self.metrics.write().await;
let prev_count = metrics.transactions_committed;
metrics.transactions_committed += 1;
metrics.avg_transaction_size = (metrics.avg_transaction_size * prev_count as f64
+ transaction.events.len() as f64)
/ metrics.transactions_committed as f64;
metrics.max_transaction_size =
metrics.max_transaction_size.max(transaction.events.len());
Ok(transaction.events)
} else {
warn!(
"Attempted to commit unknown transaction: {}",
transaction_id
);
Ok(vec![])
}
}
pub async fn rollback_transaction(&self, transaction_id: &str) -> StreamResult<()> {
let mut transactions = self.active_transactions.write().await;
if let Some(transaction) = transactions.remove(transaction_id) {
warn!(
"Rolling back transaction {} with {} events",
transaction_id,
transaction.events.len()
);
let mut metrics = self.metrics.write().await;
metrics.transactions_rolled_back += 1;
}
Ok(())
}
async fn update_metrics(&self, event: &CdcEvent) {
let mut metrics = self.metrics.write().await;
metrics.events_processed += 1;
match event.operation {
CdcOperation::Insert => metrics.inserts += 1,
CdcOperation::Update => metrics.updates += 1,
CdcOperation::Delete => metrics.deletes += 1,
CdcOperation::Snapshot => metrics.snapshot_events += 1,
_ => {}
}
if self.config.enable_deduplication {
let mut cache = self.dedup_cache.write().await;
cache.push_back((event.id, event.timestamp));
while cache.len() > self.config.dedup_window_size {
cache.pop_front();
}
}
}
pub async fn get_metrics(&self) -> CdcMetrics {
self.metrics.read().await.clone()
}
pub fn to_custom_event_data(cdc_event: &CdcEvent) -> serde_json::Value {
serde_json::to_value(cdc_event).unwrap_or(serde_json::Value::Null)
}
pub fn from_json(data: &serde_json::Value) -> StreamResult<CdcEvent> {
serde_json::from_value(data.clone())
.map_err(|e| crate::error::StreamError::Deserialization(e.to_string()))
}
}
pub struct CdcEventBuilder {
event: CdcEvent,
}
impl CdcEventBuilder {
pub fn new(source: CdcSource, operation: CdcOperation) -> Self {
Self {
event: CdcEvent {
id: Uuid::new_v4(),
source,
operation,
before: None,
after: None,
transaction_id: None,
sequence: None,
position: None,
timestamp: Utc::now(),
schema_version: None,
metadata: HashMap::new(),
},
}
}
pub fn before(mut self, data: HashMap<String, serde_json::Value>) -> Self {
self.event.before = Some(data);
self
}
pub fn after(mut self, data: HashMap<String, serde_json::Value>) -> Self {
self.event.after = Some(data);
self
}
pub fn transaction(mut self, tx_id: String, sequence: u64) -> Self {
self.event.transaction_id = Some(tx_id);
self.event.sequence = Some(sequence);
self
}
pub fn position(mut self, pos: String) -> Self {
self.event.position = Some(pos);
self
}
pub fn schema_version(mut self, version: u32) -> Self {
self.event.schema_version = Some(version);
self
}
pub fn metadata(mut self, key: String, value: String) -> Self {
self.event.metadata.insert(key, value);
self
}
pub fn build(self) -> CdcEvent {
self.event
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_source() -> CdcSource {
CdcSource {
database: "testdb".to_string(),
schema: Some("public".to_string()),
table: "users".to_string(),
connector: CdcConnector::Debezium,
}
}
#[tokio::test]
async fn test_cdc_processor_creation() {
let config = CdcConfig::default();
let processor = CdcProcessor::new(config);
let metrics = processor.get_metrics().await;
assert_eq!(metrics.events_processed, 0);
}
#[tokio::test]
async fn test_single_event_processing() {
let processor = CdcProcessor::new(CdcConfig::default());
let event = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
.after(HashMap::from([
("id".to_string(), serde_json::json!(1)),
("name".to_string(), serde_json::json!("Alice")),
]))
.build();
let result = processor.process_event(event).await.unwrap();
assert_eq!(result.len(), 1);
let metrics = processor.get_metrics().await;
assert_eq!(metrics.events_processed, 1);
assert_eq!(metrics.inserts, 1);
}
#[tokio::test]
async fn test_transaction_assembly() {
let processor = CdcProcessor::new(CdcConfig::default());
let event1 = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
.transaction("tx123".to_string(), 1)
.after(HashMap::from([("id".to_string(), serde_json::json!(1))]))
.build();
let event2 = CdcEventBuilder::new(create_test_source(), CdcOperation::Update)
.transaction("tx123".to_string(), 2)
.before(HashMap::from([("id".to_string(), serde_json::json!(1))]))
.after(HashMap::from([
("id".to_string(), serde_json::json!(1)),
("status".to_string(), serde_json::json!("active")),
]))
.build();
let result1 = processor.process_event(event1).await.unwrap();
let result2 = processor.process_event(event2).await.unwrap();
assert_eq!(result1.len(), 0); assert_eq!(result2.len(), 0);
let committed = processor.commit_transaction("tx123").await.unwrap();
assert_eq!(committed.len(), 2);
let metrics = processor.get_metrics().await;
assert_eq!(metrics.transactions_committed, 1);
assert_eq!(metrics.avg_transaction_size, 2.0);
}
#[tokio::test]
async fn test_deduplication() {
let processor = CdcProcessor::new(CdcConfig {
enable_deduplication: true,
dedup_window_size: 100,
detect_transactions: false,
..Default::default()
});
let event = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
.after(HashMap::from([("id".to_string(), serde_json::json!(1))]))
.build();
let result1 = processor.process_event(event.clone()).await.unwrap();
assert_eq!(result1.len(), 1);
let result2 = processor.process_event(event).await.unwrap();
assert_eq!(result2.len(), 0);
let metrics = processor.get_metrics().await;
assert_eq!(metrics.deduplicated_events, 1);
}
#[tokio::test]
async fn test_schema_version_tracking() {
let processor = CdcProcessor::new(CdcConfig {
track_schema_evolution: true,
..Default::default()
});
let source = create_test_source();
let schema_event = CdcEventBuilder::new(source.clone(), CdcOperation::SchemaChange)
.schema_version(2)
.build();
processor.process_event(schema_event).await.unwrap();
let metrics = processor.get_metrics().await;
assert_eq!(metrics.schema_changes_detected, 1);
}
#[tokio::test]
async fn test_transaction_rollback() {
let processor = CdcProcessor::new(CdcConfig::default());
let event = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
.transaction("tx456".to_string(), 1)
.after(HashMap::from([("id".to_string(), serde_json::json!(1))]))
.build();
processor.process_event(event).await.unwrap();
processor.rollback_transaction("tx456").await.unwrap();
let metrics = processor.get_metrics().await;
assert_eq!(metrics.transactions_rolled_back, 1);
assert_eq!(metrics.transactions_committed, 0);
}
#[tokio::test]
async fn test_event_builder() {
let source = create_test_source();
let event = CdcEventBuilder::new(source.clone(), CdcOperation::Update)
.before(HashMap::from([(
"status".to_string(),
serde_json::json!("inactive"),
)]))
.after(HashMap::from([(
"status".to_string(),
serde_json::json!("active"),
)]))
.transaction("tx789".to_string(), 5)
.position("mysql-bin.000001:1234".to_string())
.schema_version(3)
.metadata("connector".to_string(), "debezium".to_string())
.build();
assert_eq!(event.source, source);
assert_eq!(event.operation, CdcOperation::Update);
assert!(event.before.is_some());
assert!(event.after.is_some());
assert_eq!(event.transaction_id, Some("tx789".to_string()));
assert_eq!(event.sequence, Some(5));
assert_eq!(event.position, Some("mysql-bin.000001:1234".to_string()));
assert_eq!(event.schema_version, Some(3));
assert_eq!(
event.metadata.get("connector"),
Some(&"debezium".to_string())
);
}
#[tokio::test]
async fn test_json_conversion() {
let cdc_event = CdcEventBuilder::new(create_test_source(), CdcOperation::Insert)
.after(HashMap::from([("id".to_string(), serde_json::json!(1))]))
.build();
let json_data = CdcProcessor::to_custom_event_data(&cdc_event);
assert!(json_data.is_object());
let converted_back = CdcProcessor::from_json(&json_data).unwrap();
assert_eq!(converted_back.id, cdc_event.id);
assert_eq!(converted_back.operation, cdc_event.operation);
}
}