database_replicator/
filters.rs

1// ABOUTME: Central filtering logic for selective replication
2// ABOUTME: Handles database and table include/exclude patterns
3
4use crate::table_rules::TableRules;
5use anyhow::{bail, Result};
6use sha2::{Digest, Sha256};
7use tokio_postgres::Client;
8
9/// Represents replication filtering rules
10#[derive(Debug, Clone, Default)]
11pub struct ReplicationFilter {
12    include_databases: Option<Vec<String>>,
13    exclude_databases: Option<Vec<String>>,
14    include_tables: Option<Vec<String>>, // Format: "db.table"
15    exclude_tables: Option<Vec<String>>, // Format: "db.table"
16    table_rules: TableRules,
17}
18
19impl ReplicationFilter {
20    /// Creates a filter from CLI arguments
21    pub fn new(
22        include_databases: Option<Vec<String>>,
23        exclude_databases: Option<Vec<String>>,
24        include_tables: Option<Vec<String>>,
25        exclude_tables: Option<Vec<String>>,
26    ) -> Result<Self> {
27        // Validate mutually exclusive flags
28        if include_databases.is_some() && exclude_databases.is_some() {
29            bail!("Cannot use both --include-databases and --exclude-databases");
30        }
31        if include_tables.is_some() && exclude_tables.is_some() {
32            bail!("Cannot use both --include-tables and --exclude-tables");
33        }
34
35        // Validate table format (must be "database.table")
36        if let Some(ref tables) = include_tables {
37            for table in tables {
38                if !table.contains('.') {
39                    bail!(
40                        "Table must be specified as 'database.table', got '{}'",
41                        table
42                    );
43                }
44            }
45        }
46        if let Some(ref tables) = exclude_tables {
47            for table in tables {
48                if !table.contains('.') {
49                    bail!(
50                        "Table must be specified as 'database.table', got '{}'",
51                        table
52                    );
53                }
54            }
55        }
56
57        Ok(Self {
58            include_databases,
59            exclude_databases,
60            include_tables,
61            exclude_tables,
62            table_rules: TableRules::default(),
63        })
64    }
65
66    /// Creates an empty filter (replicate everything)
67    pub fn empty() -> Self {
68        Self::default()
69    }
70
71    /// Checks if any filters are active
72    pub fn is_empty(&self) -> bool {
73        self.include_databases.is_none()
74            && self.exclude_databases.is_none()
75            && self.include_tables.is_none()
76            && self.exclude_tables.is_none()
77            && self.table_rules.is_empty()
78    }
79
80    /// Returns a stable fingerprint for the filter configuration
81    pub fn fingerprint(&self) -> String {
82        fn hash_option_list(hasher: &mut Sha256, values: &Option<Vec<String>>) {
83            match values {
84                Some(items) => {
85                    let mut sorted = items.clone();
86                    sorted.sort();
87                    for item in sorted {
88                        hasher.update(item.as_bytes());
89                        hasher.update(b"|");
90                    }
91                }
92                None => hasher.update(b"<none>"),
93            }
94        }
95
96        let mut hasher = Sha256::new();
97        hash_option_list(&mut hasher, &self.include_databases);
98        hasher.update(b"#");
99        hash_option_list(&mut hasher, &self.exclude_databases);
100        hasher.update(b"#");
101        hash_option_list(&mut hasher, &self.include_tables);
102        hasher.update(b"#");
103        hash_option_list(&mut hasher, &self.exclude_tables);
104        hasher.update(b"#");
105        hasher.update(self.table_rules.fingerprint().as_bytes());
106
107        format!("{:x}", hasher.finalize())
108    }
109
110    pub fn table_rules(&self) -> &TableRules {
111        &self.table_rules
112    }
113
114    pub fn with_table_rules(mut self, rules: TableRules) -> Self {
115        self.table_rules = rules;
116        self
117    }
118
119    pub fn schema_only_tables(&self, database: &str) -> Vec<String> {
120        self.table_rules.schema_only_tables(database)
121    }
122
123    pub fn predicate_tables(&self, database: &str) -> Vec<(String, String)> {
124        self.table_rules.predicate_tables(database)
125    }
126
127    /// Gets the list of databases to include
128    pub fn include_databases(&self) -> Option<&Vec<String>> {
129        self.include_databases.as_ref()
130    }
131
132    /// Gets the list of databases to exclude
133    pub fn exclude_databases(&self) -> Option<&Vec<String>> {
134        self.exclude_databases.as_ref()
135    }
136
137    /// Gets the list of tables to include
138    pub fn include_tables(&self) -> Option<&Vec<String>> {
139        self.include_tables.as_ref()
140    }
141
142    /// Gets the list of tables to exclude
143    pub fn exclude_tables(&self) -> Option<&Vec<String>> {
144        self.exclude_tables.as_ref()
145    }
146
147    /// Determines if a database should be replicated
148    pub fn should_replicate_database(&self, db_name: &str) -> bool {
149        // If include list exists, database must be in it
150        if let Some(ref include) = self.include_databases {
151            if !include.contains(&db_name.to_string()) {
152                return false;
153            }
154        }
155
156        // If exclude list exists, database must not be in it
157        if let Some(ref exclude) = self.exclude_databases {
158            if exclude.contains(&db_name.to_string()) {
159                return false;
160            }
161        }
162
163        true
164    }
165
166    /// Determines if a table should be replicated
167    pub fn should_replicate_table(&self, db_name: &str, table_name: &str) -> bool {
168        let full_name = format!("{}.{}", db_name, table_name);
169
170        // If include list exists, table must be in it
171        if let Some(ref include) = self.include_tables {
172            if !include.contains(&full_name) {
173                return false;
174            }
175        }
176
177        // If exclude list exists, table must not be in it
178        if let Some(ref exclude) = self.exclude_tables {
179            if exclude.contains(&full_name) {
180                return false;
181            }
182        }
183
184        true
185    }
186
187    /// Gets list of databases to replicate (queries source if needed)
188    pub async fn get_databases_to_replicate(&self, source_conn: &Client) -> Result<Vec<String>> {
189        // Get all databases from source
190        let all_databases = crate::migration::schema::list_databases(source_conn).await?;
191
192        // Filter based on rules
193        let filtered: Vec<String> = all_databases
194            .into_iter()
195            .filter(|db| self.should_replicate_database(&db.name))
196            .map(|db| db.name)
197            .collect();
198
199        if filtered.is_empty() {
200            bail!("No databases selected for replication. Check your filters.");
201        }
202
203        Ok(filtered)
204    }
205
206    /// Gets list of tables to replicate for a given database
207    pub async fn get_tables_to_replicate(
208        &self,
209        source_conn: &Client,
210        db_name: &str,
211    ) -> Result<Vec<String>> {
212        // Get all tables from the database
213        let all_tables = crate::migration::schema::list_tables(source_conn).await?;
214
215        // Filter based on rules
216        let filtered: Vec<String> = all_tables
217            .into_iter()
218            .filter(|table| self.should_replicate_table(db_name, &table.name))
219            .map(|table| table.name)
220            .collect();
221
222        Ok(filtered)
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229
230    #[test]
231    fn test_new_validates_mutually_exclusive_database_flags() {
232        let result = ReplicationFilter::new(
233            Some(vec!["db1".to_string()]),
234            Some(vec!["db2".to_string()]),
235            None,
236            None,
237        );
238        assert!(result.is_err());
239        assert!(result
240            .unwrap_err()
241            .to_string()
242            .contains("Cannot use both --include-databases and --exclude-databases"));
243    }
244
245    #[test]
246    fn test_new_validates_mutually_exclusive_table_flags() {
247        let result = ReplicationFilter::new(
248            None,
249            None,
250            Some(vec!["db1.table1".to_string()]),
251            Some(vec!["db2.table2".to_string()]),
252        );
253        assert!(result.is_err());
254        assert!(result
255            .unwrap_err()
256            .to_string()
257            .contains("Cannot use both --include-tables and --exclude-tables"));
258    }
259
260    #[test]
261    fn test_new_validates_table_format_for_include() {
262        let result =
263            ReplicationFilter::new(None, None, Some(vec!["invalid_table".to_string()]), None);
264        assert!(result.is_err());
265        assert!(result
266            .unwrap_err()
267            .to_string()
268            .contains("Table must be specified as 'database.table'"));
269    }
270
271    #[test]
272    fn test_new_validates_table_format_for_exclude() {
273        let result =
274            ReplicationFilter::new(None, None, None, Some(vec!["invalid_table".to_string()]));
275        assert!(result.is_err());
276        assert!(result
277            .unwrap_err()
278            .to_string()
279            .contains("Table must be specified as 'database.table'"));
280    }
281
282    #[test]
283    fn test_should_replicate_database_with_include_list() {
284        let filter = ReplicationFilter::new(
285            Some(vec!["db1".to_string(), "db2".to_string()]),
286            None,
287            None,
288            None,
289        )
290        .unwrap();
291
292        assert!(filter.should_replicate_database("db1"));
293        assert!(filter.should_replicate_database("db2"));
294        assert!(!filter.should_replicate_database("db3"));
295    }
296
297    #[test]
298    fn test_should_replicate_database_with_exclude_list() {
299        let filter = ReplicationFilter::new(
300            None,
301            Some(vec!["test".to_string(), "dev".to_string()]),
302            None,
303            None,
304        )
305        .unwrap();
306
307        assert!(filter.should_replicate_database("production"));
308        assert!(!filter.should_replicate_database("test"));
309        assert!(!filter.should_replicate_database("dev"));
310    }
311
312    #[test]
313    fn test_should_replicate_table_with_include_list() {
314        let filter = ReplicationFilter::new(
315            None,
316            None,
317            Some(vec!["db1.users".to_string(), "db1.orders".to_string()]),
318            None,
319        )
320        .unwrap();
321
322        assert!(filter.should_replicate_table("db1", "users"));
323        assert!(filter.should_replicate_table("db1", "orders"));
324        assert!(!filter.should_replicate_table("db1", "logs"));
325    }
326
327    #[test]
328    fn test_should_replicate_table_with_exclude_list() {
329        let filter = ReplicationFilter::new(
330            None,
331            None,
332            None,
333            Some(vec![
334                "db1.audit_logs".to_string(),
335                "db1.temp_data".to_string(),
336            ]),
337        )
338        .unwrap();
339
340        assert!(filter.should_replicate_table("db1", "users"));
341        assert!(!filter.should_replicate_table("db1", "audit_logs"));
342        assert!(!filter.should_replicate_table("db1", "temp_data"));
343    }
344
345    #[test]
346    fn test_empty_filter_replicates_everything() {
347        let filter = ReplicationFilter::empty();
348
349        assert!(filter.is_empty());
350        assert!(filter.should_replicate_database("any_db"));
351        assert!(filter.should_replicate_table("any_db", "any_table"));
352    }
353
354    #[test]
355    fn test_is_empty_returns_false_when_include_databases_set() {
356        let filter =
357            ReplicationFilter::new(Some(vec!["db1".to_string()]), None, None, None).unwrap();
358        assert!(!filter.is_empty());
359    }
360
361    #[test]
362    fn test_is_empty_returns_false_when_exclude_databases_set() {
363        let filter =
364            ReplicationFilter::new(None, Some(vec!["db1".to_string()]), None, None).unwrap();
365        assert!(!filter.is_empty());
366    }
367
368    #[test]
369    fn test_is_empty_returns_false_when_include_tables_set() {
370        let filter =
371            ReplicationFilter::new(None, None, Some(vec!["db1.table1".to_string()]), None).unwrap();
372        assert!(!filter.is_empty());
373    }
374
375    #[test]
376    fn test_is_empty_returns_false_when_exclude_tables_set() {
377        let filter =
378            ReplicationFilter::new(None, None, None, Some(vec!["db1.table1".to_string()])).unwrap();
379        assert!(!filter.is_empty());
380    }
381
382    #[test]
383    fn test_fingerprint_is_order_insensitive() {
384        let filter_a = ReplicationFilter::new(
385            Some(vec!["db1".to_string(), "db2".to_string()]),
386            None,
387            None,
388            None,
389        )
390        .unwrap();
391        let filter_b = ReplicationFilter::new(
392            Some(vec!["db2".to_string(), "db1".to_string()]),
393            None,
394            None,
395            None,
396        )
397        .unwrap();
398
399        assert_eq!(filter_a.fingerprint(), filter_b.fingerprint());
400    }
401
402    #[test]
403    fn test_fingerprint_differs_for_different_filters() {
404        let filter_a =
405            ReplicationFilter::new(None, Some(vec!["db1".to_string()]), None, None).unwrap();
406        let filter_b =
407            ReplicationFilter::new(None, Some(vec!["db2".to_string()]), None, None).unwrap();
408
409        assert_ne!(filter_a.fingerprint(), filter_b.fingerprint());
410    }
411
412    #[test]
413    fn test_fingerprint_includes_table_rules_schema() {
414        use crate::table_rules::TableRules;
415
416        // Create two filters with different table rule schemas
417        let mut table_rules_a = TableRules::default();
418        table_rules_a
419            .apply_schema_only_cli(&["public.orders".to_string()])
420            .unwrap();
421
422        let mut table_rules_b = TableRules::default();
423        table_rules_b
424            .apply_schema_only_cli(&["analytics.orders".to_string()])
425            .unwrap();
426
427        let filter_a = ReplicationFilter::empty().with_table_rules(table_rules_a);
428        let filter_b = ReplicationFilter::empty().with_table_rules(table_rules_b);
429
430        assert_ne!(
431            filter_a.fingerprint(),
432            filter_b.fingerprint(),
433            "Filters with different table rule schemas should produce different fingerprints"
434        );
435    }
436}