database_replicator/
filters.rs

1// ABOUTME: Central filtering logic for selective replication
2// ABOUTME: Handles database and table include/exclude patterns
3
4use crate::table_rules::TableRules;
5use anyhow::{bail, Result};
6use sha2::{Digest, Sha256};
7use tokio_postgres::Client;
8
9/// Represents replication filtering rules
10#[derive(Debug, Clone, Default)]
11pub struct ReplicationFilter {
12    include_databases: Option<Vec<String>>,
13    exclude_databases: Option<Vec<String>>,
14    include_tables: Option<Vec<String>>, // Format: "db.table"
15    exclude_tables: Option<Vec<String>>, // Format: "db.table"
16    table_rules: TableRules,
17}
18
19impl ReplicationFilter {
20    /// Creates a filter from CLI arguments
21    pub fn new(
22        include_databases: Option<Vec<String>>,
23        exclude_databases: Option<Vec<String>>,
24        include_tables: Option<Vec<String>>,
25        exclude_tables: Option<Vec<String>>,
26    ) -> Result<Self> {
27        // Validate mutually exclusive flags
28        if include_databases.is_some() && exclude_databases.is_some() {
29            bail!("Cannot use both --include-databases and --exclude-databases");
30        }
31        if include_tables.is_some() && exclude_tables.is_some() {
32            bail!("Cannot use both --include-tables and --exclude-tables");
33        }
34
35        // Validate table format (must be "database.table")
36        if let Some(ref tables) = include_tables {
37            for table in tables {
38                if !table.contains('.') {
39                    bail!(
40                        "Table must be specified as 'database.table', got '{}'",
41                        table
42                    );
43                }
44            }
45        }
46        if let Some(ref tables) = exclude_tables {
47            for table in tables {
48                if !table.contains('.') {
49                    bail!(
50                        "Table must be specified as 'database.table', got '{}'",
51                        table
52                    );
53                }
54            }
55        }
56
57        Ok(Self {
58            include_databases,
59            exclude_databases,
60            include_tables,
61            exclude_tables,
62            table_rules: TableRules::default(),
63        })
64    }
65
66    /// Creates an empty filter (replicate everything)
67    pub fn empty() -> Self {
68        Self::default()
69    }
70
71    /// Checks if any filters are active
72    pub fn is_empty(&self) -> bool {
73        self.include_databases.is_none()
74            && self.exclude_databases.is_none()
75            && self.include_tables.is_none()
76            && self.exclude_tables.is_none()
77            && self.table_rules.is_empty()
78    }
79
80    /// Returns a stable fingerprint for the filter configuration
81    pub fn fingerprint(&self) -> String {
82        fn hash_option_list(hasher: &mut Sha256, values: &Option<Vec<String>>) {
83            match values {
84                Some(items) => {
85                    let mut sorted = items.clone();
86                    sorted.sort();
87                    for item in sorted {
88                        hasher.update(item.as_bytes());
89                        hasher.update(b"|");
90                    }
91                }
92                None => hasher.update(b"<none>"),
93            }
94        }
95
96        let mut hasher = Sha256::new();
97        hash_option_list(&mut hasher, &self.include_databases);
98        hasher.update(b"#");
99        hash_option_list(&mut hasher, &self.exclude_databases);
100        hasher.update(b"#");
101        hash_option_list(&mut hasher, &self.include_tables);
102        hasher.update(b"#");
103        hash_option_list(&mut hasher, &self.exclude_tables);
104        hasher.update(b"#");
105        hasher.update(self.table_rules.fingerprint().as_bytes());
106
107        format!("{:x}", hasher.finalize())
108    }
109
110    pub fn table_rules(&self) -> &TableRules {
111        &self.table_rules
112    }
113
114    pub fn with_table_rules(mut self, rules: TableRules) -> Self {
115        self.table_rules = rules;
116        self
117    }
118
119    pub fn schema_only_tables(&self, database: &str) -> Vec<String> {
120        self.table_rules.schema_only_tables(database)
121    }
122
123    pub fn predicate_tables(&self, database: &str) -> Vec<(String, String)> {
124        self.table_rules.predicate_tables(database)
125    }
126
127    /// Gets the list of databases to include
128    pub fn include_databases(&self) -> Option<&Vec<String>> {
129        self.include_databases.as_ref()
130    }
131
132    /// Gets the list of databases to exclude
133    pub fn exclude_databases(&self) -> Option<&Vec<String>> {
134        self.exclude_databases.as_ref()
135    }
136
137    /// Gets the list of tables to include
138    pub fn include_tables(&self) -> Option<&Vec<String>> {
139        self.include_tables.as_ref()
140    }
141
142    /// Gets the list of tables to exclude
143    pub fn exclude_tables(&self) -> Option<&Vec<String>> {
144        self.exclude_tables.as_ref()
145    }
146
147    /// Gets the explicit list of databases to check/replicate
148    ///
149    /// Returns databases from:
150    /// 1. include_databases if specified, OR
151    /// 2. database names extracted from include_tables if specified
152    ///
153    /// Returns None if no explicit database list can be determined
154    /// (meaning all databases should be enumerated).
155    pub fn databases_to_check(&self) -> Option<Vec<String>> {
156        if let Some(ref include) = self.include_databases {
157            return Some(include.clone());
158        }
159
160        if let Some(ref include_tables) = self.include_tables {
161            // Extract unique database names from "database.table" format
162            let mut databases: Vec<String> = include_tables
163                .iter()
164                .filter_map(|table| table.split('.').next().map(String::from))
165                .collect();
166            databases.sort();
167            databases.dedup();
168            if !databases.is_empty() {
169                return Some(databases);
170            }
171        }
172
173        None
174    }
175
176    /// Determines if a database should be replicated
177    ///
178    /// A database is replicated if:
179    /// 1. It's in the include_databases list (if specified), OR
180    /// 2. It's referenced in include_tables (if specified and no include_databases), OR
181    /// 3. No include filters are specified (replicate all)
182    ///
183    /// AND it's not in the exclude_databases list.
184    pub fn should_replicate_database(&self, db_name: &str) -> bool {
185        // If include_databases list exists, database must be in it
186        if let Some(ref include) = self.include_databases {
187            if !include.contains(&db_name.to_string()) {
188                return false;
189            }
190        } else if let Some(ref include_tables) = self.include_tables {
191            // If include_tables is specified but include_databases is not,
192            // only replicate databases referenced in include_tables
193            let db_referenced = include_tables
194                .iter()
195                .any(|table| table.split('.').next() == Some(db_name));
196            if !db_referenced {
197                return false;
198            }
199        }
200
201        // If exclude list exists, database must not be in it
202        if let Some(ref exclude) = self.exclude_databases {
203            if exclude.contains(&db_name.to_string()) {
204                return false;
205            }
206        }
207
208        true
209    }
210
211    /// Determines if a table should be replicated
212    pub fn should_replicate_table(&self, db_name: &str, table_name: &str) -> bool {
213        let full_name = format!("{}.{}", db_name, table_name);
214
215        // If include list exists, table must be in it
216        if let Some(ref include) = self.include_tables {
217            if !include.contains(&full_name) {
218                return false;
219            }
220        }
221
222        // If exclude list exists, table must not be in it
223        if let Some(ref exclude) = self.exclude_tables {
224            if exclude.contains(&full_name) {
225                return false;
226            }
227        }
228
229        true
230    }
231
232    /// Gets list of databases to replicate (queries source if needed)
233    pub async fn get_databases_to_replicate(&self, source_conn: &Client) -> Result<Vec<String>> {
234        // Get all databases from source
235        let all_databases = crate::migration::schema::list_databases(source_conn).await?;
236
237        // Filter based on rules
238        let filtered: Vec<String> = all_databases
239            .into_iter()
240            .filter(|db| self.should_replicate_database(&db.name))
241            .map(|db| db.name)
242            .collect();
243
244        if filtered.is_empty() {
245            bail!("No databases selected for replication. Check your filters.");
246        }
247
248        Ok(filtered)
249    }
250
251    /// Gets list of tables to replicate for a given database
252    pub async fn get_tables_to_replicate(
253        &self,
254        source_conn: &Client,
255        db_name: &str,
256    ) -> Result<Vec<String>> {
257        // Get all tables from the database
258        let all_tables = crate::migration::schema::list_tables(source_conn).await?;
259
260        // Filter based on rules
261        let filtered: Vec<String> = all_tables
262            .into_iter()
263            .filter(|table| self.should_replicate_table(db_name, &table.name))
264            .map(|table| table.name)
265            .collect();
266
267        Ok(filtered)
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274
275    #[test]
276    fn test_new_validates_mutually_exclusive_database_flags() {
277        let result = ReplicationFilter::new(
278            Some(vec!["db1".to_string()]),
279            Some(vec!["db2".to_string()]),
280            None,
281            None,
282        );
283        assert!(result.is_err());
284        assert!(result
285            .unwrap_err()
286            .to_string()
287            .contains("Cannot use both --include-databases and --exclude-databases"));
288    }
289
290    #[test]
291    fn test_new_validates_mutually_exclusive_table_flags() {
292        let result = ReplicationFilter::new(
293            None,
294            None,
295            Some(vec!["db1.table1".to_string()]),
296            Some(vec!["db2.table2".to_string()]),
297        );
298        assert!(result.is_err());
299        assert!(result
300            .unwrap_err()
301            .to_string()
302            .contains("Cannot use both --include-tables and --exclude-tables"));
303    }
304
305    #[test]
306    fn test_new_validates_table_format_for_include() {
307        let result =
308            ReplicationFilter::new(None, None, Some(vec!["invalid_table".to_string()]), None);
309        assert!(result.is_err());
310        assert!(result
311            .unwrap_err()
312            .to_string()
313            .contains("Table must be specified as 'database.table'"));
314    }
315
316    #[test]
317    fn test_new_validates_table_format_for_exclude() {
318        let result =
319            ReplicationFilter::new(None, None, None, Some(vec!["invalid_table".to_string()]));
320        assert!(result.is_err());
321        assert!(result
322            .unwrap_err()
323            .to_string()
324            .contains("Table must be specified as 'database.table'"));
325    }
326
327    #[test]
328    fn test_should_replicate_database_with_include_list() {
329        let filter = ReplicationFilter::new(
330            Some(vec!["db1".to_string(), "db2".to_string()]),
331            None,
332            None,
333            None,
334        )
335        .unwrap();
336
337        assert!(filter.should_replicate_database("db1"));
338        assert!(filter.should_replicate_database("db2"));
339        assert!(!filter.should_replicate_database("db3"));
340    }
341
342    #[test]
343    fn test_should_replicate_database_with_exclude_list() {
344        let filter = ReplicationFilter::new(
345            None,
346            Some(vec!["test".to_string(), "dev".to_string()]),
347            None,
348            None,
349        )
350        .unwrap();
351
352        assert!(filter.should_replicate_database("production"));
353        assert!(!filter.should_replicate_database("test"));
354        assert!(!filter.should_replicate_database("dev"));
355    }
356
357    #[test]
358    fn test_should_replicate_table_with_include_list() {
359        let filter = ReplicationFilter::new(
360            None,
361            None,
362            Some(vec!["db1.users".to_string(), "db1.orders".to_string()]),
363            None,
364        )
365        .unwrap();
366
367        assert!(filter.should_replicate_table("db1", "users"));
368        assert!(filter.should_replicate_table("db1", "orders"));
369        assert!(!filter.should_replicate_table("db1", "logs"));
370    }
371
372    #[test]
373    fn test_should_replicate_table_with_exclude_list() {
374        let filter = ReplicationFilter::new(
375            None,
376            None,
377            None,
378            Some(vec![
379                "db1.audit_logs".to_string(),
380                "db1.temp_data".to_string(),
381            ]),
382        )
383        .unwrap();
384
385        assert!(filter.should_replicate_table("db1", "users"));
386        assert!(!filter.should_replicate_table("db1", "audit_logs"));
387        assert!(!filter.should_replicate_table("db1", "temp_data"));
388    }
389
390    #[test]
391    fn test_empty_filter_replicates_everything() {
392        let filter = ReplicationFilter::empty();
393
394        assert!(filter.is_empty());
395        assert!(filter.should_replicate_database("any_db"));
396        assert!(filter.should_replicate_table("any_db", "any_table"));
397    }
398
399    #[test]
400    fn test_is_empty_returns_false_when_include_databases_set() {
401        let filter =
402            ReplicationFilter::new(Some(vec!["db1".to_string()]), None, None, None).unwrap();
403        assert!(!filter.is_empty());
404    }
405
406    #[test]
407    fn test_is_empty_returns_false_when_exclude_databases_set() {
408        let filter =
409            ReplicationFilter::new(None, Some(vec!["db1".to_string()]), None, None).unwrap();
410        assert!(!filter.is_empty());
411    }
412
413    #[test]
414    fn test_is_empty_returns_false_when_include_tables_set() {
415        let filter =
416            ReplicationFilter::new(None, None, Some(vec!["db1.table1".to_string()]), None).unwrap();
417        assert!(!filter.is_empty());
418    }
419
420    #[test]
421    fn test_is_empty_returns_false_when_exclude_tables_set() {
422        let filter =
423            ReplicationFilter::new(None, None, None, Some(vec!["db1.table1".to_string()])).unwrap();
424        assert!(!filter.is_empty());
425    }
426
427    #[test]
428    fn test_fingerprint_is_order_insensitive() {
429        let filter_a = ReplicationFilter::new(
430            Some(vec!["db1".to_string(), "db2".to_string()]),
431            None,
432            None,
433            None,
434        )
435        .unwrap();
436        let filter_b = ReplicationFilter::new(
437            Some(vec!["db2".to_string(), "db1".to_string()]),
438            None,
439            None,
440            None,
441        )
442        .unwrap();
443
444        assert_eq!(filter_a.fingerprint(), filter_b.fingerprint());
445    }
446
447    #[test]
448    fn test_fingerprint_differs_for_different_filters() {
449        let filter_a =
450            ReplicationFilter::new(None, Some(vec!["db1".to_string()]), None, None).unwrap();
451        let filter_b =
452            ReplicationFilter::new(None, Some(vec!["db2".to_string()]), None, None).unwrap();
453
454        assert_ne!(filter_a.fingerprint(), filter_b.fingerprint());
455    }
456
457    #[test]
458    fn test_fingerprint_includes_table_rules_schema() {
459        use crate::table_rules::TableRules;
460
461        // Create two filters with different table rule schemas
462        let mut table_rules_a = TableRules::default();
463        table_rules_a
464            .apply_schema_only_cli(&["public.orders".to_string()])
465            .unwrap();
466
467        let mut table_rules_b = TableRules::default();
468        table_rules_b
469            .apply_schema_only_cli(&["analytics.orders".to_string()])
470            .unwrap();
471
472        let filter_a = ReplicationFilter::empty().with_table_rules(table_rules_a);
473        let filter_b = ReplicationFilter::empty().with_table_rules(table_rules_b);
474
475        assert_ne!(
476            filter_a.fingerprint(),
477            filter_b.fingerprint(),
478            "Filters with different table rule schemas should produce different fingerprints"
479        );
480    }
481}