1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
//! Traits for CDC sources
//!
//! Database-agnostic trait definitions.
use crate::common::Result;
use async_trait::async_trait;
/// Trait for CDC source implementations
///
/// Implement this trait to create a CDC source for a specific database.
#[async_trait]
pub trait CdcSource: Send + Sync {
/// Start capturing changes
///
/// This begins the CDC process. For streaming sources, this typically
/// spawns a background task that processes the replication stream.
async fn start(&mut self) -> Result<()>;
/// Stop capturing changes
///
/// Gracefully stop the CDC process. This should signal any background
/// tasks to stop and wait for them to finish.
async fn stop(&mut self) -> Result<()>;
/// Check if the source is healthy
///
/// Returns true if the source is connected and processing events.
async fn is_healthy(&self) -> bool;
}
/// Configuration trait for CDC sources
pub trait CdcConfig: Send + Sync {
/// Get the source type name (e.g., "postgres", "mysql")
fn source_type(&self) -> &'static str;
/// Get the connection string
fn connection_string(&self) -> &str;
/// Validate the configuration
fn validate(&self) -> Result<()>;
}
/// Trait for type mappers (database types → Avro)
pub trait TypeMapper {
/// Map a database column type to an Avro schema JSON representation
fn to_avro_json(&self, column_type: &str, nullable: bool) -> serde_json::Value;
/// Generate a complete Avro schema for a table
fn table_to_avro_schema(
&self,
namespace: &str,
table: &str,
columns: &[(String, String, bool)], // (name, type, nullable)
) -> anyhow::Result<apache_avro::Schema>;
}
#[cfg(test)]
mod tests {
use super::*;
struct MockCdcSource {
active: bool,
}
#[async_trait]
impl CdcSource for MockCdcSource {
async fn start(&mut self) -> Result<()> {
self.active = true;
Ok(())
}
async fn stop(&mut self) -> Result<()> {
self.active = false;
Ok(())
}
async fn is_healthy(&self) -> bool {
self.active
}
}
#[tokio::test]
async fn test_mock_source() {
let mut source = MockCdcSource { active: false };
assert!(!source.is_healthy().await);
source.start().await.unwrap();
assert!(source.is_healthy().await);
source.stop().await.unwrap();
assert!(!source.is_healthy().await);
}
}