1use crate::table_rules::TableRules;
5use anyhow::{bail, Result};
6use sha2::{Digest, Sha256};
7use tokio_postgres::Client;
8
9#[derive(Debug, Clone, Default)]
11pub struct ReplicationFilter {
12 include_databases: Option<Vec<String>>,
13 exclude_databases: Option<Vec<String>>,
14 include_tables: Option<Vec<String>>, exclude_tables: Option<Vec<String>>, table_rules: TableRules,
17}
18
19impl ReplicationFilter {
20 pub fn new(
22 include_databases: Option<Vec<String>>,
23 exclude_databases: Option<Vec<String>>,
24 include_tables: Option<Vec<String>>,
25 exclude_tables: Option<Vec<String>>,
26 ) -> Result<Self> {
27 if include_databases.is_some() && exclude_databases.is_some() {
29 bail!("Cannot use both --include-databases and --exclude-databases");
30 }
31 if include_tables.is_some() && exclude_tables.is_some() {
32 bail!("Cannot use both --include-tables and --exclude-tables");
33 }
34
35 if let Some(ref tables) = include_tables {
37 for table in tables {
38 if !table.contains('.') {
39 bail!(
40 "Table must be specified as 'database.table', got '{}'",
41 table
42 );
43 }
44 }
45 }
46 if let Some(ref tables) = exclude_tables {
47 for table in tables {
48 if !table.contains('.') {
49 bail!(
50 "Table must be specified as 'database.table', got '{}'",
51 table
52 );
53 }
54 }
55 }
56
57 Ok(Self {
58 include_databases,
59 exclude_databases,
60 include_tables,
61 exclude_tables,
62 table_rules: TableRules::default(),
63 })
64 }
65
66 pub fn empty() -> Self {
68 Self::default()
69 }
70
71 pub fn is_empty(&self) -> bool {
73 self.include_databases.is_none()
74 && self.exclude_databases.is_none()
75 && self.include_tables.is_none()
76 && self.exclude_tables.is_none()
77 && self.table_rules.is_empty()
78 }
79
80 pub fn fingerprint(&self) -> String {
82 fn hash_option_list(hasher: &mut Sha256, values: &Option<Vec<String>>) {
83 match values {
84 Some(items) => {
85 let mut sorted = items.clone();
86 sorted.sort();
87 for item in sorted {
88 hasher.update(item.as_bytes());
89 hasher.update(b"|");
90 }
91 }
92 None => hasher.update(b"<none>"),
93 }
94 }
95
96 let mut hasher = Sha256::new();
97 hash_option_list(&mut hasher, &self.include_databases);
98 hasher.update(b"#");
99 hash_option_list(&mut hasher, &self.exclude_databases);
100 hasher.update(b"#");
101 hash_option_list(&mut hasher, &self.include_tables);
102 hasher.update(b"#");
103 hash_option_list(&mut hasher, &self.exclude_tables);
104 hasher.update(b"#");
105 hasher.update(self.table_rules.fingerprint().as_bytes());
106
107 format!("{:x}", hasher.finalize())
108 }
109
110 pub fn table_rules(&self) -> &TableRules {
111 &self.table_rules
112 }
113
114 pub fn with_table_rules(mut self, rules: TableRules) -> Self {
115 self.table_rules = rules;
116 self
117 }
118
119 pub fn schema_only_tables(&self, database: &str) -> Vec<String> {
120 self.table_rules.schema_only_tables(database)
121 }
122
123 pub fn predicate_tables(&self, database: &str) -> Vec<(String, String)> {
124 self.table_rules.predicate_tables(database)
125 }
126
127 pub fn include_databases(&self) -> Option<&Vec<String>> {
129 self.include_databases.as_ref()
130 }
131
132 pub fn exclude_databases(&self) -> Option<&Vec<String>> {
134 self.exclude_databases.as_ref()
135 }
136
137 pub fn include_tables(&self) -> Option<&Vec<String>> {
139 self.include_tables.as_ref()
140 }
141
142 pub fn exclude_tables(&self) -> Option<&Vec<String>> {
144 self.exclude_tables.as_ref()
145 }
146
147 pub fn databases_to_check(&self) -> Option<Vec<String>> {
156 if let Some(ref include) = self.include_databases {
157 return Some(include.clone());
158 }
159
160 if let Some(ref include_tables) = self.include_tables {
161 let mut databases: Vec<String> = include_tables
163 .iter()
164 .filter_map(|table| table.split('.').next().map(String::from))
165 .collect();
166 databases.sort();
167 databases.dedup();
168 if !databases.is_empty() {
169 return Some(databases);
170 }
171 }
172
173 None
174 }
175
176 pub fn should_replicate_database(&self, db_name: &str) -> bool {
185 if let Some(ref include) = self.include_databases {
187 if !include.contains(&db_name.to_string()) {
188 return false;
189 }
190 } else if let Some(ref include_tables) = self.include_tables {
191 let db_referenced = include_tables
194 .iter()
195 .any(|table| table.split('.').next() == Some(db_name));
196 if !db_referenced {
197 return false;
198 }
199 }
200
201 if let Some(ref exclude) = self.exclude_databases {
203 if exclude.contains(&db_name.to_string()) {
204 return false;
205 }
206 }
207
208 true
209 }
210
211 pub fn should_replicate_table(&self, db_name: &str, table_name: &str) -> bool {
213 let full_name = format!("{}.{}", db_name, table_name);
214
215 if let Some(ref include) = self.include_tables {
217 if !include.contains(&full_name) {
218 return false;
219 }
220 }
221
222 if let Some(ref exclude) = self.exclude_tables {
224 if exclude.contains(&full_name) {
225 return false;
226 }
227 }
228
229 true
230 }
231
232 pub async fn get_databases_to_replicate(&self, source_conn: &Client) -> Result<Vec<String>> {
234 let all_databases = crate::migration::schema::list_databases(source_conn).await?;
236
237 let filtered: Vec<String> = all_databases
239 .into_iter()
240 .filter(|db| self.should_replicate_database(&db.name))
241 .map(|db| db.name)
242 .collect();
243
244 if filtered.is_empty() {
245 bail!("No databases selected for replication. Check your filters.");
246 }
247
248 Ok(filtered)
249 }
250
251 pub async fn get_tables_to_replicate(
253 &self,
254 source_conn: &Client,
255 db_name: &str,
256 ) -> Result<Vec<String>> {
257 let all_tables = crate::migration::schema::list_tables(source_conn).await?;
259
260 let filtered: Vec<String> = all_tables
262 .into_iter()
263 .filter(|table| self.should_replicate_table(db_name, &table.name))
264 .map(|table| table.name)
265 .collect();
266
267 Ok(filtered)
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274
275 #[test]
276 fn test_new_validates_mutually_exclusive_database_flags() {
277 let result = ReplicationFilter::new(
278 Some(vec!["db1".to_string()]),
279 Some(vec!["db2".to_string()]),
280 None,
281 None,
282 );
283 assert!(result.is_err());
284 assert!(result
285 .unwrap_err()
286 .to_string()
287 .contains("Cannot use both --include-databases and --exclude-databases"));
288 }
289
290 #[test]
291 fn test_new_validates_mutually_exclusive_table_flags() {
292 let result = ReplicationFilter::new(
293 None,
294 None,
295 Some(vec!["db1.table1".to_string()]),
296 Some(vec!["db2.table2".to_string()]),
297 );
298 assert!(result.is_err());
299 assert!(result
300 .unwrap_err()
301 .to_string()
302 .contains("Cannot use both --include-tables and --exclude-tables"));
303 }
304
305 #[test]
306 fn test_new_validates_table_format_for_include() {
307 let result =
308 ReplicationFilter::new(None, None, Some(vec!["invalid_table".to_string()]), None);
309 assert!(result.is_err());
310 assert!(result
311 .unwrap_err()
312 .to_string()
313 .contains("Table must be specified as 'database.table'"));
314 }
315
316 #[test]
317 fn test_new_validates_table_format_for_exclude() {
318 let result =
319 ReplicationFilter::new(None, None, None, Some(vec!["invalid_table".to_string()]));
320 assert!(result.is_err());
321 assert!(result
322 .unwrap_err()
323 .to_string()
324 .contains("Table must be specified as 'database.table'"));
325 }
326
327 #[test]
328 fn test_should_replicate_database_with_include_list() {
329 let filter = ReplicationFilter::new(
330 Some(vec!["db1".to_string(), "db2".to_string()]),
331 None,
332 None,
333 None,
334 )
335 .unwrap();
336
337 assert!(filter.should_replicate_database("db1"));
338 assert!(filter.should_replicate_database("db2"));
339 assert!(!filter.should_replicate_database("db3"));
340 }
341
342 #[test]
343 fn test_should_replicate_database_with_exclude_list() {
344 let filter = ReplicationFilter::new(
345 None,
346 Some(vec!["test".to_string(), "dev".to_string()]),
347 None,
348 None,
349 )
350 .unwrap();
351
352 assert!(filter.should_replicate_database("production"));
353 assert!(!filter.should_replicate_database("test"));
354 assert!(!filter.should_replicate_database("dev"));
355 }
356
357 #[test]
358 fn test_should_replicate_table_with_include_list() {
359 let filter = ReplicationFilter::new(
360 None,
361 None,
362 Some(vec!["db1.users".to_string(), "db1.orders".to_string()]),
363 None,
364 )
365 .unwrap();
366
367 assert!(filter.should_replicate_table("db1", "users"));
368 assert!(filter.should_replicate_table("db1", "orders"));
369 assert!(!filter.should_replicate_table("db1", "logs"));
370 }
371
372 #[test]
373 fn test_should_replicate_table_with_exclude_list() {
374 let filter = ReplicationFilter::new(
375 None,
376 None,
377 None,
378 Some(vec![
379 "db1.audit_logs".to_string(),
380 "db1.temp_data".to_string(),
381 ]),
382 )
383 .unwrap();
384
385 assert!(filter.should_replicate_table("db1", "users"));
386 assert!(!filter.should_replicate_table("db1", "audit_logs"));
387 assert!(!filter.should_replicate_table("db1", "temp_data"));
388 }
389
390 #[test]
391 fn test_empty_filter_replicates_everything() {
392 let filter = ReplicationFilter::empty();
393
394 assert!(filter.is_empty());
395 assert!(filter.should_replicate_database("any_db"));
396 assert!(filter.should_replicate_table("any_db", "any_table"));
397 }
398
399 #[test]
400 fn test_is_empty_returns_false_when_include_databases_set() {
401 let filter =
402 ReplicationFilter::new(Some(vec!["db1".to_string()]), None, None, None).unwrap();
403 assert!(!filter.is_empty());
404 }
405
406 #[test]
407 fn test_is_empty_returns_false_when_exclude_databases_set() {
408 let filter =
409 ReplicationFilter::new(None, Some(vec!["db1".to_string()]), None, None).unwrap();
410 assert!(!filter.is_empty());
411 }
412
413 #[test]
414 fn test_is_empty_returns_false_when_include_tables_set() {
415 let filter =
416 ReplicationFilter::new(None, None, Some(vec!["db1.table1".to_string()]), None).unwrap();
417 assert!(!filter.is_empty());
418 }
419
420 #[test]
421 fn test_is_empty_returns_false_when_exclude_tables_set() {
422 let filter =
423 ReplicationFilter::new(None, None, None, Some(vec!["db1.table1".to_string()])).unwrap();
424 assert!(!filter.is_empty());
425 }
426
427 #[test]
428 fn test_fingerprint_is_order_insensitive() {
429 let filter_a = ReplicationFilter::new(
430 Some(vec!["db1".to_string(), "db2".to_string()]),
431 None,
432 None,
433 None,
434 )
435 .unwrap();
436 let filter_b = ReplicationFilter::new(
437 Some(vec!["db2".to_string(), "db1".to_string()]),
438 None,
439 None,
440 None,
441 )
442 .unwrap();
443
444 assert_eq!(filter_a.fingerprint(), filter_b.fingerprint());
445 }
446
447 #[test]
448 fn test_fingerprint_differs_for_different_filters() {
449 let filter_a =
450 ReplicationFilter::new(None, Some(vec!["db1".to_string()]), None, None).unwrap();
451 let filter_b =
452 ReplicationFilter::new(None, Some(vec!["db2".to_string()]), None, None).unwrap();
453
454 assert_ne!(filter_a.fingerprint(), filter_b.fingerprint());
455 }
456
457 #[test]
458 fn test_fingerprint_includes_table_rules_schema() {
459 use crate::table_rules::TableRules;
460
461 let mut table_rules_a = TableRules::default();
463 table_rules_a
464 .apply_schema_only_cli(&["public.orders".to_string()])
465 .unwrap();
466
467 let mut table_rules_b = TableRules::default();
468 table_rules_b
469 .apply_schema_only_cli(&["analytics.orders".to_string()])
470 .unwrap();
471
472 let filter_a = ReplicationFilter::empty().with_table_rules(table_rules_a);
473 let filter_b = ReplicationFilter::empty().with_table_rules(table_rules_b);
474
475 assert_ne!(
476 filter_a.fingerprint(),
477 filter_b.fingerprint(),
478 "Filters with different table rule schemas should produce different fingerprints"
479 );
480 }
481}