Skip to main content

aegis_streaming/
cdc.rs

1//! Aegis Streaming CDC (Change Data Capture)
2//!
3//! Change data capture for tracking database changes.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::event::{Event, EventData, EventType};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13// =============================================================================
14// Change Type
15// =============================================================================
16
17/// Type of data change.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19pub enum ChangeType {
20    Insert,
21    Update,
22    Delete,
23    Truncate,
24}
25
26impl From<ChangeType> for EventType {
27    fn from(ct: ChangeType) -> Self {
28        match ct {
29            ChangeType::Insert => EventType::Created,
30            ChangeType::Update => EventType::Updated,
31            ChangeType::Delete => EventType::Deleted,
32            ChangeType::Truncate => EventType::Custom("truncate".to_string()),
33        }
34    }
35}
36
37// =============================================================================
38// Change Event
39// =============================================================================
40
41/// A change data capture event.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct ChangeEvent {
44    pub change_type: ChangeType,
45    pub source: ChangeSource,
46    pub timestamp: u64,
47    pub key: Option<String>,
48    pub before: Option<serde_json::Value>,
49    pub after: Option<serde_json::Value>,
50    pub metadata: HashMap<String, String>,
51}
52
53impl ChangeEvent {
54    /// Create an insert change event.
55    pub fn insert(source: ChangeSource, key: String, data: serde_json::Value) -> Self {
56        Self {
57            change_type: ChangeType::Insert,
58            source,
59            timestamp: current_timestamp(),
60            key: Some(key),
61            before: None,
62            after: Some(data),
63            metadata: HashMap::new(),
64        }
65    }
66
67    /// Create an update change event.
68    pub fn update(
69        source: ChangeSource,
70        key: String,
71        before: serde_json::Value,
72        after: serde_json::Value,
73    ) -> Self {
74        Self {
75            change_type: ChangeType::Update,
76            source,
77            timestamp: current_timestamp(),
78            key: Some(key),
79            before: Some(before),
80            after: Some(after),
81            metadata: HashMap::new(),
82        }
83    }
84
85    /// Create a delete change event.
86    pub fn delete(source: ChangeSource, key: String, data: serde_json::Value) -> Self {
87        Self {
88            change_type: ChangeType::Delete,
89            source,
90            timestamp: current_timestamp(),
91            key: Some(key),
92            before: Some(data),
93            after: None,
94            metadata: HashMap::new(),
95        }
96    }
97
98    /// Create a truncate change event.
99    pub fn truncate(source: ChangeSource) -> Self {
100        Self {
101            change_type: ChangeType::Truncate,
102            source,
103            timestamp: current_timestamp(),
104            key: None,
105            before: None,
106            after: None,
107            metadata: HashMap::new(),
108        }
109    }
110
111    /// Add metadata to the change event.
112    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
113        self.metadata.insert(key.into(), value.into());
114        self
115    }
116
117    /// Convert to a generic Event.
118    pub fn to_event(&self) -> Event {
119        let source = format!("{}.{}", self.source.database, self.source.table);
120        let data = serde_json::json!({
121            "change_type": format!("{:?}", self.change_type),
122            "key": self.key,
123            "before": self.before,
124            "after": self.after,
125        });
126
127        Event::new(self.change_type.into(), source, EventData::Json(data))
128    }
129}
130
131// =============================================================================
132// Change Source
133// =============================================================================
134
135/// Source of a change event.
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct ChangeSource {
138    pub database: String,
139    pub table: String,
140    pub schema: Option<String>,
141}
142
143impl ChangeSource {
144    pub fn new(database: impl Into<String>, table: impl Into<String>) -> Self {
145        Self {
146            database: database.into(),
147            table: table.into(),
148            schema: None,
149        }
150    }
151
152    pub fn with_schema(mut self, schema: impl Into<String>) -> Self {
153        self.schema = Some(schema.into());
154        self
155    }
156
157    pub fn full_name(&self) -> String {
158        match &self.schema {
159            Some(s) => format!("{}.{}.{}", self.database, s, self.table),
160            None => format!("{}.{}", self.database, self.table),
161        }
162    }
163}
164
165// =============================================================================
166// CDC Configuration
167// =============================================================================
168
169/// Configuration for change data capture.
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct CdcConfig {
172    pub enabled: bool,
173    pub sources: Vec<CdcSourceConfig>,
174    pub batch_size: usize,
175    pub batch_timeout_ms: u64,
176    pub include_before: bool,
177    pub include_schema: bool,
178}
179
180impl Default for CdcConfig {
181    fn default() -> Self {
182        Self {
183            enabled: true,
184            sources: Vec::new(),
185            batch_size: 100,
186            batch_timeout_ms: 1000,
187            include_before: true,
188            include_schema: false,
189        }
190    }
191}
192
193impl CdcConfig {
194    pub fn new() -> Self {
195        Self::default()
196    }
197
198    pub fn add_source(&mut self, source: CdcSourceConfig) {
199        self.sources.push(source);
200    }
201
202    pub fn with_source(mut self, source: CdcSourceConfig) -> Self {
203        self.sources.push(source);
204        self
205    }
206
207    pub fn with_batch_size(mut self, size: usize) -> Self {
208        self.batch_size = size;
209        self
210    }
211
212    pub fn with_batch_timeout(mut self, timeout_ms: u64) -> Self {
213        self.batch_timeout_ms = timeout_ms;
214        self
215    }
216}
217
218// =============================================================================
219// CDC Source Configuration
220// =============================================================================
221
222/// Configuration for a CDC source.
223#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct CdcSourceConfig {
225    pub source: ChangeSource,
226    pub change_types: Vec<ChangeType>,
227    pub columns: Option<Vec<String>>,
228    pub filter: Option<String>,
229}
230
231impl CdcSourceConfig {
232    pub fn new(database: impl Into<String>, table: impl Into<String>) -> Self {
233        Self {
234            source: ChangeSource::new(database, table),
235            change_types: vec![ChangeType::Insert, ChangeType::Update, ChangeType::Delete],
236            columns: None,
237            filter: None,
238        }
239    }
240
241    pub fn inserts_only(mut self) -> Self {
242        self.change_types = vec![ChangeType::Insert];
243        self
244    }
245
246    pub fn updates_only(mut self) -> Self {
247        self.change_types = vec![ChangeType::Update];
248        self
249    }
250
251    pub fn deletes_only(mut self) -> Self {
252        self.change_types = vec![ChangeType::Delete];
253        self
254    }
255
256    pub fn with_columns(mut self, columns: Vec<String>) -> Self {
257        self.columns = Some(columns);
258        self
259    }
260
261    pub fn with_filter(mut self, filter: impl Into<String>) -> Self {
262        self.filter = Some(filter.into());
263        self
264    }
265
266    pub fn should_capture(&self, change_type: ChangeType) -> bool {
267        self.change_types.contains(&change_type)
268    }
269}
270
271// =============================================================================
272// CDC Tracker
273// =============================================================================
274
275/// Tracks CDC position for resumable streaming.
276#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct CdcPosition {
278    pub lsn: Option<u64>,
279    pub timestamp: u64,
280    pub sequence: u64,
281}
282
283impl CdcPosition {
284    pub fn new() -> Self {
285        Self {
286            lsn: None,
287            timestamp: current_timestamp(),
288            sequence: 0,
289        }
290    }
291
292    pub fn with_lsn(mut self, lsn: u64) -> Self {
293        self.lsn = Some(lsn);
294        self
295    }
296
297    pub fn advance(&mut self) {
298        self.sequence += 1;
299        self.timestamp = current_timestamp();
300    }
301}
302
303impl Default for CdcPosition {
304    fn default() -> Self {
305        Self::new()
306    }
307}
308
309fn current_timestamp() -> u64 {
310    SystemTime::now()
311        .duration_since(UNIX_EPOCH)
312        .map(|d| d.as_millis() as u64)
313        .unwrap_or(0)
314}
315
316// =============================================================================
317// Tests
318// =============================================================================
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323
324    #[test]
325    fn test_change_event_insert() {
326        let source = ChangeSource::new("mydb", "users");
327        let data = serde_json::json!({"id": 1, "name": "Alice"});
328
329        let event = ChangeEvent::insert(source, "1".to_string(), data);
330
331        assert_eq!(event.change_type, ChangeType::Insert);
332        assert!(event.before.is_none());
333        assert!(event.after.is_some());
334    }
335
336    #[test]
337    fn test_change_event_update() {
338        let source = ChangeSource::new("mydb", "users");
339        let before = serde_json::json!({"id": 1, "name": "Alice"});
340        let after = serde_json::json!({"id": 1, "name": "Alice Smith"});
341
342        let event = ChangeEvent::update(source, "1".to_string(), before, after);
343
344        assert_eq!(event.change_type, ChangeType::Update);
345        assert!(event.before.is_some());
346        assert!(event.after.is_some());
347    }
348
349    #[test]
350    fn test_change_event_to_event() {
351        let source = ChangeSource::new("mydb", "users");
352        let data = serde_json::json!({"id": 1});
353
354        let change = ChangeEvent::insert(source, "1".to_string(), data);
355        let event = change.to_event();
356
357        assert_eq!(event.event_type, EventType::Created);
358        assert_eq!(event.source, "mydb.users");
359    }
360
361    #[test]
362    fn test_cdc_source_config() {
363        let config = CdcSourceConfig::new("mydb", "orders")
364            .inserts_only()
365            .with_filter("amount > 100");
366
367        assert!(config.should_capture(ChangeType::Insert));
368        assert!(!config.should_capture(ChangeType::Update));
369        assert!(config.filter.is_some());
370    }
371
372    #[test]
373    fn test_cdc_config() {
374        let config = CdcConfig::new()
375            .with_source(CdcSourceConfig::new("db", "table1"))
376            .with_batch_size(50);
377
378        assert!(config.enabled);
379        assert_eq!(config.sources.len(), 1);
380        assert_eq!(config.batch_size, 50);
381    }
382
383    #[test]
384    fn test_cdc_position() {
385        let mut position = CdcPosition::new();
386        assert_eq!(position.sequence, 0);
387
388        position.advance();
389        assert_eq!(position.sequence, 1);
390
391        position.advance();
392        assert_eq!(position.sequence, 2);
393    }
394}