1use crate::event::{Event, EventData, EventType};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19pub enum ChangeType {
20 Insert,
21 Update,
22 Delete,
23 Truncate,
24}
25
26impl From<ChangeType> for EventType {
27 fn from(ct: ChangeType) -> Self {
28 match ct {
29 ChangeType::Insert => EventType::Created,
30 ChangeType::Update => EventType::Updated,
31 ChangeType::Delete => EventType::Deleted,
32 ChangeType::Truncate => EventType::Custom("truncate".to_string()),
33 }
34 }
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct ChangeEvent {
44 pub change_type: ChangeType,
45 pub source: ChangeSource,
46 pub timestamp: u64,
47 pub key: Option<String>,
48 pub before: Option<serde_json::Value>,
49 pub after: Option<serde_json::Value>,
50 pub metadata: HashMap<String, String>,
51}
52
53impl ChangeEvent {
54 pub fn insert(source: ChangeSource, key: String, data: serde_json::Value) -> Self {
56 Self {
57 change_type: ChangeType::Insert,
58 source,
59 timestamp: current_timestamp(),
60 key: Some(key),
61 before: None,
62 after: Some(data),
63 metadata: HashMap::new(),
64 }
65 }
66
67 pub fn update(
69 source: ChangeSource,
70 key: String,
71 before: serde_json::Value,
72 after: serde_json::Value,
73 ) -> Self {
74 Self {
75 change_type: ChangeType::Update,
76 source,
77 timestamp: current_timestamp(),
78 key: Some(key),
79 before: Some(before),
80 after: Some(after),
81 metadata: HashMap::new(),
82 }
83 }
84
85 pub fn delete(source: ChangeSource, key: String, data: serde_json::Value) -> Self {
87 Self {
88 change_type: ChangeType::Delete,
89 source,
90 timestamp: current_timestamp(),
91 key: Some(key),
92 before: Some(data),
93 after: None,
94 metadata: HashMap::new(),
95 }
96 }
97
98 pub fn truncate(source: ChangeSource) -> Self {
100 Self {
101 change_type: ChangeType::Truncate,
102 source,
103 timestamp: current_timestamp(),
104 key: None,
105 before: None,
106 after: None,
107 metadata: HashMap::new(),
108 }
109 }
110
111 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
113 self.metadata.insert(key.into(), value.into());
114 self
115 }
116
117 pub fn to_event(&self) -> Event {
119 let source = format!("{}.{}", self.source.database, self.source.table);
120 let data = serde_json::json!({
121 "change_type": format!("{:?}", self.change_type),
122 "key": self.key,
123 "before": self.before,
124 "after": self.after,
125 });
126
127 Event::new(self.change_type.into(), source, EventData::Json(data))
128 }
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct ChangeSource {
138 pub database: String,
139 pub table: String,
140 pub schema: Option<String>,
141}
142
143impl ChangeSource {
144 pub fn new(database: impl Into<String>, table: impl Into<String>) -> Self {
145 Self {
146 database: database.into(),
147 table: table.into(),
148 schema: None,
149 }
150 }
151
152 pub fn with_schema(mut self, schema: impl Into<String>) -> Self {
153 self.schema = Some(schema.into());
154 self
155 }
156
157 pub fn full_name(&self) -> String {
158 match &self.schema {
159 Some(s) => format!("{}.{}.{}", self.database, s, self.table),
160 None => format!("{}.{}", self.database, self.table),
161 }
162 }
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct CdcConfig {
172 pub enabled: bool,
173 pub sources: Vec<CdcSourceConfig>,
174 pub batch_size: usize,
175 pub batch_timeout_ms: u64,
176 pub include_before: bool,
177 pub include_schema: bool,
178}
179
180impl Default for CdcConfig {
181 fn default() -> Self {
182 Self {
183 enabled: true,
184 sources: Vec::new(),
185 batch_size: 100,
186 batch_timeout_ms: 1000,
187 include_before: true,
188 include_schema: false,
189 }
190 }
191}
192
193impl CdcConfig {
194 pub fn new() -> Self {
195 Self::default()
196 }
197
198 pub fn add_source(&mut self, source: CdcSourceConfig) {
199 self.sources.push(source);
200 }
201
202 pub fn with_source(mut self, source: CdcSourceConfig) -> Self {
203 self.sources.push(source);
204 self
205 }
206
207 pub fn with_batch_size(mut self, size: usize) -> Self {
208 self.batch_size = size;
209 self
210 }
211
212 pub fn with_batch_timeout(mut self, timeout_ms: u64) -> Self {
213 self.batch_timeout_ms = timeout_ms;
214 self
215 }
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct CdcSourceConfig {
225 pub source: ChangeSource,
226 pub change_types: Vec<ChangeType>,
227 pub columns: Option<Vec<String>>,
228 pub filter: Option<String>,
229}
230
231impl CdcSourceConfig {
232 pub fn new(database: impl Into<String>, table: impl Into<String>) -> Self {
233 Self {
234 source: ChangeSource::new(database, table),
235 change_types: vec![ChangeType::Insert, ChangeType::Update, ChangeType::Delete],
236 columns: None,
237 filter: None,
238 }
239 }
240
241 pub fn inserts_only(mut self) -> Self {
242 self.change_types = vec![ChangeType::Insert];
243 self
244 }
245
246 pub fn updates_only(mut self) -> Self {
247 self.change_types = vec![ChangeType::Update];
248 self
249 }
250
251 pub fn deletes_only(mut self) -> Self {
252 self.change_types = vec![ChangeType::Delete];
253 self
254 }
255
256 pub fn with_columns(mut self, columns: Vec<String>) -> Self {
257 self.columns = Some(columns);
258 self
259 }
260
261 pub fn with_filter(mut self, filter: impl Into<String>) -> Self {
262 self.filter = Some(filter.into());
263 self
264 }
265
266 pub fn should_capture(&self, change_type: ChangeType) -> bool {
267 self.change_types.contains(&change_type)
268 }
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct CdcPosition {
278 pub lsn: Option<u64>,
279 pub timestamp: u64,
280 pub sequence: u64,
281}
282
283impl CdcPosition {
284 pub fn new() -> Self {
285 Self {
286 lsn: None,
287 timestamp: current_timestamp(),
288 sequence: 0,
289 }
290 }
291
292 pub fn with_lsn(mut self, lsn: u64) -> Self {
293 self.lsn = Some(lsn);
294 self
295 }
296
297 pub fn advance(&mut self) {
298 self.sequence += 1;
299 self.timestamp = current_timestamp();
300 }
301}
302
303impl Default for CdcPosition {
304 fn default() -> Self {
305 Self::new()
306 }
307}
308
309fn current_timestamp() -> u64 {
310 SystemTime::now()
311 .duration_since(UNIX_EPOCH)
312 .map(|d| d.as_millis() as u64)
313 .unwrap_or(0)
314}
315
316#[cfg(test)]
321mod tests {
322 use super::*;
323
324 #[test]
325 fn test_change_event_insert() {
326 let source = ChangeSource::new("mydb", "users");
327 let data = serde_json::json!({"id": 1, "name": "Alice"});
328
329 let event = ChangeEvent::insert(source, "1".to_string(), data);
330
331 assert_eq!(event.change_type, ChangeType::Insert);
332 assert!(event.before.is_none());
333 assert!(event.after.is_some());
334 }
335
336 #[test]
337 fn test_change_event_update() {
338 let source = ChangeSource::new("mydb", "users");
339 let before = serde_json::json!({"id": 1, "name": "Alice"});
340 let after = serde_json::json!({"id": 1, "name": "Alice Smith"});
341
342 let event = ChangeEvent::update(source, "1".to_string(), before, after);
343
344 assert_eq!(event.change_type, ChangeType::Update);
345 assert!(event.before.is_some());
346 assert!(event.after.is_some());
347 }
348
349 #[test]
350 fn test_change_event_to_event() {
351 let source = ChangeSource::new("mydb", "users");
352 let data = serde_json::json!({"id": 1});
353
354 let change = ChangeEvent::insert(source, "1".to_string(), data);
355 let event = change.to_event();
356
357 assert_eq!(event.event_type, EventType::Created);
358 assert_eq!(event.source, "mydb.users");
359 }
360
361 #[test]
362 fn test_cdc_source_config() {
363 let config = CdcSourceConfig::new("mydb", "orders")
364 .inserts_only()
365 .with_filter("amount > 100");
366
367 assert!(config.should_capture(ChangeType::Insert));
368 assert!(!config.should_capture(ChangeType::Update));
369 assert!(config.filter.is_some());
370 }
371
372 #[test]
373 fn test_cdc_config() {
374 let config = CdcConfig::new()
375 .with_source(CdcSourceConfig::new("db", "table1"))
376 .with_batch_size(50);
377
378 assert!(config.enabled);
379 assert_eq!(config.sources.len(), 1);
380 assert_eq!(config.batch_size, 50);
381 }
382
383 #[test]
384 fn test_cdc_position() {
385 let mut position = CdcPosition::new();
386 assert_eq!(position.sequence, 0);
387
388 position.advance();
389 assert_eq!(position.sequence, 1);
390
391 position.advance();
392 assert_eq!(position.sequence, 2);
393 }
394}