1use crate::common::pattern::PatternSet;
23use crate::common::CdcEvent;
24use serde::{Deserialize, Serialize};
25use std::collections::{HashMap, HashSet};
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct CdcFilterConfig {
30 #[serde(default)]
32 pub include_tables: Vec<String>,
33
34 #[serde(default)]
36 pub exclude_tables: Vec<String>,
37
38 #[serde(default)]
40 pub table_columns: HashMap<String, TableColumnConfig>,
41
42 #[serde(default)]
44 pub global_exclude_columns: Vec<String>,
45
46 #[serde(default)]
48 pub mask_columns: Vec<String>,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct TableColumnConfig {
54 #[serde(default)]
56 pub include: Vec<String>,
57
58 #[serde(default)]
60 pub exclude: Vec<String>,
61
62 #[serde(default)]
64 pub mask: Vec<String>,
65}
66
67impl Default for CdcFilterConfig {
68 fn default() -> Self {
69 Self {
70 include_tables: vec!["*".to_string()], exclude_tables: vec![],
72 table_columns: HashMap::new(),
73 global_exclude_columns: vec![],
74 mask_columns: vec![],
75 }
76 }
77}
78
79pub struct CdcFilter {
81 config: CdcFilterConfig,
82 include_patterns: PatternSet,
83 exclude_patterns: PatternSet,
84 global_exclude_set: HashSet<String>,
85 mask_set: HashSet<String>,
86}
87
88impl CdcFilter {
89 pub fn new(config: CdcFilterConfig) -> Result<Self, crate::common::pattern::PatternError> {
91 let include_patterns = PatternSet::from_patterns(&config.include_tables)?;
92 let exclude_patterns = PatternSet::from_patterns(&config.exclude_tables)?;
93
94 let global_exclude_set: HashSet<String> = config
95 .global_exclude_columns
96 .iter()
97 .map(|s| s.to_lowercase())
98 .collect();
99
100 let mask_set: HashSet<String> = config
101 .mask_columns
102 .iter()
103 .map(|s| s.to_lowercase())
104 .collect();
105
106 Ok(Self {
107 config,
108 include_patterns,
109 exclude_patterns,
110 global_exclude_set,
111 mask_set,
112 })
113 }
114
115 pub fn should_include_table(&self, schema: &str, table: &str) -> bool {
117 if self.exclude_patterns.matches_qualified(schema, table) {
119 return false;
120 }
121
122 if self.include_patterns.matches_qualified(schema, table) {
124 return true;
125 }
126
127 self.include_patterns.is_empty()
129 }
130
131 pub fn filter_event(&self, event: &mut CdcEvent) -> bool {
136 if !self.should_include_table(&event.schema, &event.table) {
138 return false;
139 }
140
141 let table_key = format!("{}.{}", event.schema, event.table);
143 let table_config = self.config.table_columns.get(&table_key);
144
145 if let Some(ref mut before) = event.before {
147 self.filter_json(before, table_config);
148 }
149
150 if let Some(ref mut after) = event.after {
152 self.filter_json(after, table_config);
153 }
154
155 true
156 }
157
158 fn filter_json(&self, value: &mut serde_json::Value, table_config: Option<&TableColumnConfig>) {
160 if let serde_json::Value::Object(map) = value {
161 let mut to_remove = Vec::new();
163 let mut to_mask = Vec::new();
164
165 for key in map.keys() {
166 let key_lower = key.to_lowercase();
167
168 if self.global_exclude_set.contains(&key_lower) {
170 to_remove.push(key.clone());
171 continue;
172 }
173
174 if self.mask_set.contains(&key_lower) {
176 to_mask.push(key.clone());
177 continue;
178 }
179
180 if let Some(tc) = table_config {
182 if tc.exclude.iter().any(|e| e.to_lowercase() == key_lower) {
184 to_remove.push(key.clone());
185 continue;
186 }
187
188 if !tc.include.is_empty()
190 && !tc.include.iter().any(|i| i.to_lowercase() == key_lower)
191 {
192 to_remove.push(key.clone());
193 continue;
194 }
195
196 if tc.mask.iter().any(|m| m.to_lowercase() == key_lower) {
198 to_mask.push(key.clone());
199 }
200 }
201 }
202
203 for key in to_remove {
205 map.remove(&key);
206 }
207
208 for key in to_mask {
210 if let Some(v) = map.get_mut(&key) {
211 *v = serde_json::Value::String("***REDACTED***".to_string());
212 }
213 }
214 }
215 }
216
217 pub fn config(&self) -> &CdcFilterConfig {
219 &self.config
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226 use crate::common::CdcOp;
227
228 #[test]
229 fn test_table_include_all() {
230 let config = CdcFilterConfig::default();
231 let filter = CdcFilter::new(config).unwrap();
232
233 assert!(filter.should_include_table("public", "users"));
234 assert!(filter.should_include_table("schema", "orders"));
235 }
236
237 #[test]
238 fn test_table_include_pattern() {
239 let config = CdcFilterConfig {
240 include_tables: vec!["public.*".to_string()],
241 ..Default::default()
242 };
243 let filter = CdcFilter::new(config).unwrap();
244
245 assert!(filter.should_include_table("public", "users"));
246 assert!(filter.should_include_table("public", "orders"));
247 assert!(!filter.should_include_table("private", "secrets"));
248 }
249
250 #[test]
251 fn test_table_exclude() {
252 let config = CdcFilterConfig {
253 include_tables: vec!["*".to_string()],
254 exclude_tables: vec!["*.audit_log".to_string(), "private.*".to_string()],
255 ..Default::default()
256 };
257 let filter = CdcFilter::new(config).unwrap();
258
259 assert!(filter.should_include_table("public", "users"));
260 assert!(!filter.should_include_table("public", "audit_log"));
261 assert!(!filter.should_include_table("private", "secrets"));
262 }
263
264 #[test]
265 fn test_column_filtering() {
266 let config = CdcFilterConfig {
267 global_exclude_columns: vec!["password".to_string(), "ssn".to_string()],
268 ..Default::default()
269 };
270 let filter = CdcFilter::new(config).unwrap();
271
272 let mut event = CdcEvent {
273 source_type: "postgres".into(),
274 database: "mydb".into(),
275 schema: "public".into(),
276 table: "users".into(),
277 op: CdcOp::Insert,
278 before: None,
279 after: Some(serde_json::json!({
280 "id": 1,
281 "name": "Alice",
282 "password": "secret123",
283 "ssn": "123-45-6789"
284 })),
285 timestamp: 0,
286 transaction: None,
287 };
288
289 assert!(filter.filter_event(&mut event));
290
291 let after = event.after.as_ref().unwrap();
292 assert!(after.get("id").is_some());
293 assert!(after.get("name").is_some());
294 assert!(after.get("password").is_none());
295 assert!(after.get("ssn").is_none());
296 }
297
298 #[test]
299 fn test_column_masking() {
300 let config = CdcFilterConfig {
301 mask_columns: vec!["email".to_string(), "phone".to_string()],
302 ..Default::default()
303 };
304 let filter = CdcFilter::new(config).unwrap();
305
306 let mut event = CdcEvent {
307 source_type: "postgres".into(),
308 database: "mydb".into(),
309 schema: "public".into(),
310 table: "users".into(),
311 op: CdcOp::Insert,
312 before: None,
313 after: Some(serde_json::json!({
314 "id": 1,
315 "name": "Alice",
316 "email": "alice@example.com",
317 "phone": "+1-555-1234"
318 })),
319 timestamp: 0,
320 transaction: None,
321 };
322
323 assert!(filter.filter_event(&mut event));
324
325 let after = event.after.as_ref().unwrap();
326 assert_eq!(after.get("email").unwrap(), "***REDACTED***");
327 assert_eq!(after.get("phone").unwrap(), "***REDACTED***");
328 assert_eq!(after.get("name").unwrap(), "Alice");
329 }
330
331 #[test]
332 fn test_table_specific_columns() {
333 let mut table_columns = HashMap::new();
334 table_columns.insert(
335 "public.users".to_string(),
336 TableColumnConfig {
337 include: vec!["id".to_string(), "name".to_string()],
338 exclude: vec![],
339 mask: vec![],
340 },
341 );
342
343 let config = CdcFilterConfig {
344 table_columns,
345 ..Default::default()
346 };
347 let filter = CdcFilter::new(config).unwrap();
348
349 let mut event = CdcEvent {
350 source_type: "postgres".into(),
351 database: "mydb".into(),
352 schema: "public".into(),
353 table: "users".into(),
354 op: CdcOp::Insert,
355 before: None,
356 after: Some(serde_json::json!({
357 "id": 1,
358 "name": "Alice",
359 "email": "alice@example.com",
360 "created_at": "2024-01-01"
361 })),
362 timestamp: 0,
363 transaction: None,
364 };
365
366 assert!(filter.filter_event(&mut event));
367
368 let after = event.after.as_ref().unwrap();
369 assert!(after.get("id").is_some());
370 assert!(after.get("name").is_some());
371 assert!(after.get("email").is_none()); assert!(after.get("created_at").is_none()); }
374
375 #[test]
376 fn test_filter_rejects_excluded_table() {
377 let config = CdcFilterConfig {
378 exclude_tables: vec!["audit_log".to_string()],
379 ..Default::default()
380 };
381 let filter = CdcFilter::new(config).unwrap();
382
383 let mut event = CdcEvent {
384 source_type: "postgres".into(),
385 database: "mydb".into(),
386 schema: "public".into(),
387 table: "audit_log".into(),
388 op: CdcOp::Insert,
389 before: None,
390 after: Some(serde_json::json!({"id": 1})),
391 timestamp: 0,
392 transaction: None,
393 };
394
395 assert!(!filter.filter_event(&mut event));
396 }
397}