Skip to main content

rivven_cdc/common/
filter.rs

1//! Table and column filtering for CDC events
2//!
3//! Provides production-grade filtering capabilities:
4//! - Include/exclude tables by pattern (glob syntax)
5//! - Include/exclude columns per table
6//! - Regex-based matching
7//! - Column masking for sensitive data (PII redaction)
8//!
9//! # Example
10//!
11//! ```rust
12//! use rivven_cdc::CdcFilterConfig;
13//!
14//! let config = CdcFilterConfig {
15//!     include_tables: vec!["public.*".to_string()],
16//!     exclude_tables: vec!["*.audit_log".to_string()],
17//!     mask_columns: vec!["password".to_string(), "ssn".to_string()],
18//!     ..Default::default()
19//! };
20//! ```
21
22use crate::common::pattern::PatternSet;
23use crate::common::CdcEvent;
24use serde::{Deserialize, Serialize};
25use std::collections::{HashMap, HashSet};
26
27/// Filter configuration for CDC tables and columns
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct CdcFilterConfig {
30    /// Tables to include (supports glob patterns like "public.*")
31    #[serde(default)]
32    pub include_tables: Vec<String>,
33
34    /// Tables to exclude (evaluated after includes)
35    #[serde(default)]
36    pub exclude_tables: Vec<String>,
37
38    /// Per-table column configuration
39    #[serde(default)]
40    pub table_columns: HashMap<String, TableColumnConfig>,
41
42    /// Global column excludes (applied to all tables)
43    #[serde(default)]
44    pub global_exclude_columns: Vec<String>,
45
46    /// Columns to mask (show as "***REDACTED***")
47    #[serde(default)]
48    pub mask_columns: Vec<String>,
49}
50
51/// Column configuration for a specific table
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct TableColumnConfig {
54    /// Columns to include (if empty, include all)
55    #[serde(default)]
56    pub include: Vec<String>,
57
58    /// Columns to exclude
59    #[serde(default)]
60    pub exclude: Vec<String>,
61
62    /// Columns to mask with "***REDACTED***"
63    #[serde(default)]
64    pub mask: Vec<String>,
65}
66
67impl Default for CdcFilterConfig {
68    fn default() -> Self {
69        Self {
70            include_tables: vec!["*".to_string()], // Include all by default
71            exclude_tables: vec![],
72            table_columns: HashMap::new(),
73            global_exclude_columns: vec![],
74            mask_columns: vec![],
75        }
76    }
77}
78
79/// Compiled filter for efficient runtime evaluation
80pub struct CdcFilter {
81    config: CdcFilterConfig,
82    include_patterns: PatternSet,
83    exclude_patterns: PatternSet,
84    global_exclude_set: HashSet<String>,
85    mask_set: HashSet<String>,
86}
87
88impl CdcFilter {
89    /// Create a new filter from configuration
90    pub fn new(config: CdcFilterConfig) -> Result<Self, crate::common::pattern::PatternError> {
91        let include_patterns = PatternSet::from_patterns(&config.include_tables)?;
92        let exclude_patterns = PatternSet::from_patterns(&config.exclude_tables)?;
93
94        let global_exclude_set: HashSet<String> = config
95            .global_exclude_columns
96            .iter()
97            .map(|s| s.to_lowercase())
98            .collect();
99
100        let mask_set: HashSet<String> = config
101            .mask_columns
102            .iter()
103            .map(|s| s.to_lowercase())
104            .collect();
105
106        Ok(Self {
107            config,
108            include_patterns,
109            exclude_patterns,
110            global_exclude_set,
111            mask_set,
112        })
113    }
114
115    /// Check if a table should be included
116    pub fn should_include_table(&self, schema: &str, table: &str) -> bool {
117        // Check excludes first (higher priority)
118        if self.exclude_patterns.matches_qualified(schema, table) {
119            return false;
120        }
121
122        // Check includes
123        if self.include_patterns.matches_qualified(schema, table) {
124            return true;
125        }
126
127        // Default: include if no include patterns specified
128        self.include_patterns.is_empty()
129    }
130
131    /// Filter and transform a CDC event
132    ///
133    /// Returns true if the event should be processed, false if filtered out.
134    /// Modifies the event in place to remove/mask columns.
135    pub fn filter_event(&self, event: &mut CdcEvent) -> bool {
136        // Table filter
137        if !self.should_include_table(&event.schema, &event.table) {
138            return false;
139        }
140
141        // Column filtering
142        let table_key = format!("{}.{}", event.schema, event.table);
143        let table_config = self.config.table_columns.get(&table_key);
144
145        // Filter 'before' payload
146        if let Some(ref mut before) = event.before {
147            self.filter_json(before, table_config);
148        }
149
150        // Filter 'after' payload
151        if let Some(ref mut after) = event.after {
152            self.filter_json(after, table_config);
153        }
154
155        true
156    }
157
158    /// Filter and mask columns in a JSON object
159    fn filter_json(&self, value: &mut serde_json::Value, table_config: Option<&TableColumnConfig>) {
160        if let serde_json::Value::Object(map) = value {
161            // Collect columns to remove
162            let mut to_remove = Vec::new();
163            let mut to_mask = Vec::new();
164
165            for key in map.keys() {
166                let key_lower = key.to_lowercase();
167
168                // Global excludes
169                if self.global_exclude_set.contains(&key_lower) {
170                    to_remove.push(key.clone());
171                    continue;
172                }
173
174                // Global masks
175                if self.mask_set.contains(&key_lower) {
176                    to_mask.push(key.clone());
177                    continue;
178                }
179
180                // Table-specific config
181                if let Some(tc) = table_config {
182                    // Check table excludes
183                    if tc.exclude.iter().any(|e| e.to_lowercase() == key_lower) {
184                        to_remove.push(key.clone());
185                        continue;
186                    }
187
188                    // Check table includes (if specified, only include those)
189                    if !tc.include.is_empty()
190                        && !tc.include.iter().any(|i| i.to_lowercase() == key_lower)
191                    {
192                        to_remove.push(key.clone());
193                        continue;
194                    }
195
196                    // Check table masks
197                    if tc.mask.iter().any(|m| m.to_lowercase() == key_lower) {
198                        to_mask.push(key.clone());
199                    }
200                }
201            }
202
203            // Remove excluded columns
204            for key in to_remove {
205                map.remove(&key);
206            }
207
208            // Mask sensitive columns
209            for key in to_mask {
210                if let Some(v) = map.get_mut(&key) {
211                    *v = serde_json::Value::String("***REDACTED***".to_string());
212                }
213            }
214        }
215    }
216
217    /// Get the filter configuration
218    pub fn config(&self) -> &CdcFilterConfig {
219        &self.config
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use crate::common::CdcOp;
227
228    #[test]
229    fn test_table_include_all() {
230        let config = CdcFilterConfig::default();
231        let filter = CdcFilter::new(config).unwrap();
232
233        assert!(filter.should_include_table("public", "users"));
234        assert!(filter.should_include_table("schema", "orders"));
235    }
236
237    #[test]
238    fn test_table_include_pattern() {
239        let config = CdcFilterConfig {
240            include_tables: vec!["public.*".to_string()],
241            ..Default::default()
242        };
243        let filter = CdcFilter::new(config).unwrap();
244
245        assert!(filter.should_include_table("public", "users"));
246        assert!(filter.should_include_table("public", "orders"));
247        assert!(!filter.should_include_table("private", "secrets"));
248    }
249
250    #[test]
251    fn test_table_exclude() {
252        let config = CdcFilterConfig {
253            include_tables: vec!["*".to_string()],
254            exclude_tables: vec!["*.audit_log".to_string(), "private.*".to_string()],
255            ..Default::default()
256        };
257        let filter = CdcFilter::new(config).unwrap();
258
259        assert!(filter.should_include_table("public", "users"));
260        assert!(!filter.should_include_table("public", "audit_log"));
261        assert!(!filter.should_include_table("private", "secrets"));
262    }
263
264    #[test]
265    fn test_column_filtering() {
266        let config = CdcFilterConfig {
267            global_exclude_columns: vec!["password".to_string(), "ssn".to_string()],
268            ..Default::default()
269        };
270        let filter = CdcFilter::new(config).unwrap();
271
272        let mut event = CdcEvent {
273            source_type: "postgres".into(),
274            database: "mydb".into(),
275            schema: "public".into(),
276            table: "users".into(),
277            op: CdcOp::Insert,
278            before: None,
279            after: Some(serde_json::json!({
280                "id": 1,
281                "name": "Alice",
282                "password": "secret123",
283                "ssn": "123-45-6789"
284            })),
285            timestamp: 0,
286            transaction: None,
287        };
288
289        assert!(filter.filter_event(&mut event));
290
291        let after = event.after.as_ref().unwrap();
292        assert!(after.get("id").is_some());
293        assert!(after.get("name").is_some());
294        assert!(after.get("password").is_none());
295        assert!(after.get("ssn").is_none());
296    }
297
298    #[test]
299    fn test_column_masking() {
300        let config = CdcFilterConfig {
301            mask_columns: vec!["email".to_string(), "phone".to_string()],
302            ..Default::default()
303        };
304        let filter = CdcFilter::new(config).unwrap();
305
306        let mut event = CdcEvent {
307            source_type: "postgres".into(),
308            database: "mydb".into(),
309            schema: "public".into(),
310            table: "users".into(),
311            op: CdcOp::Insert,
312            before: None,
313            after: Some(serde_json::json!({
314                "id": 1,
315                "name": "Alice",
316                "email": "alice@example.com",
317                "phone": "+1-555-1234"
318            })),
319            timestamp: 0,
320            transaction: None,
321        };
322
323        assert!(filter.filter_event(&mut event));
324
325        let after = event.after.as_ref().unwrap();
326        assert_eq!(after.get("email").unwrap(), "***REDACTED***");
327        assert_eq!(after.get("phone").unwrap(), "***REDACTED***");
328        assert_eq!(after.get("name").unwrap(), "Alice");
329    }
330
331    #[test]
332    fn test_table_specific_columns() {
333        let mut table_columns = HashMap::new();
334        table_columns.insert(
335            "public.users".to_string(),
336            TableColumnConfig {
337                include: vec!["id".to_string(), "name".to_string()],
338                exclude: vec![],
339                mask: vec![],
340            },
341        );
342
343        let config = CdcFilterConfig {
344            table_columns,
345            ..Default::default()
346        };
347        let filter = CdcFilter::new(config).unwrap();
348
349        let mut event = CdcEvent {
350            source_type: "postgres".into(),
351            database: "mydb".into(),
352            schema: "public".into(),
353            table: "users".into(),
354            op: CdcOp::Insert,
355            before: None,
356            after: Some(serde_json::json!({
357                "id": 1,
358                "name": "Alice",
359                "email": "alice@example.com",
360                "created_at": "2024-01-01"
361            })),
362            timestamp: 0,
363            transaction: None,
364        };
365
366        assert!(filter.filter_event(&mut event));
367
368        let after = event.after.as_ref().unwrap();
369        assert!(after.get("id").is_some());
370        assert!(after.get("name").is_some());
371        assert!(after.get("email").is_none()); // Not in include list
372        assert!(after.get("created_at").is_none()); // Not in include list
373    }
374
375    #[test]
376    fn test_filter_rejects_excluded_table() {
377        let config = CdcFilterConfig {
378            exclude_tables: vec!["audit_log".to_string()],
379            ..Default::default()
380        };
381        let filter = CdcFilter::new(config).unwrap();
382
383        let mut event = CdcEvent {
384            source_type: "postgres".into(),
385            database: "mydb".into(),
386            schema: "public".into(),
387            table: "audit_log".into(),
388            op: CdcOp::Insert,
389            before: None,
390            after: Some(serde_json::json!({"id": 1})),
391            timestamp: 0,
392            transaction: None,
393        };
394
395        assert!(!filter.filter_event(&mut event));
396    }
397}