1use crate::table_rules::TableRules;
5use anyhow::{bail, Result};
6use sha2::{Digest, Sha256};
7use tokio_postgres::Client;
8
9#[derive(Debug, Clone, Default)]
11pub struct ReplicationFilter {
12 include_databases: Option<Vec<String>>,
13 exclude_databases: Option<Vec<String>>,
14 include_tables: Option<Vec<String>>, exclude_tables: Option<Vec<String>>, table_rules: TableRules,
17}
18
19impl ReplicationFilter {
20 pub fn new(
22 include_databases: Option<Vec<String>>,
23 exclude_databases: Option<Vec<String>>,
24 include_tables: Option<Vec<String>>,
25 exclude_tables: Option<Vec<String>>,
26 ) -> Result<Self> {
27 if include_databases.is_some() && exclude_databases.is_some() {
29 bail!("Cannot use both --include-databases and --exclude-databases");
30 }
31 if include_tables.is_some() && exclude_tables.is_some() {
32 bail!("Cannot use both --include-tables and --exclude-tables");
33 }
34
35 if let Some(ref tables) = include_tables {
37 for table in tables {
38 if !table.contains('.') {
39 bail!(
40 "Table must be specified as 'database.table', got '{}'",
41 table
42 );
43 }
44 }
45 }
46 if let Some(ref tables) = exclude_tables {
47 for table in tables {
48 if !table.contains('.') {
49 bail!(
50 "Table must be specified as 'database.table', got '{}'",
51 table
52 );
53 }
54 }
55 }
56
57 Ok(Self {
58 include_databases,
59 exclude_databases,
60 include_tables,
61 exclude_tables,
62 table_rules: TableRules::default(),
63 })
64 }
65
66 pub fn empty() -> Self {
68 Self::default()
69 }
70
71 pub fn is_empty(&self) -> bool {
73 self.include_databases.is_none()
74 && self.exclude_databases.is_none()
75 && self.include_tables.is_none()
76 && self.exclude_tables.is_none()
77 && self.table_rules.is_empty()
78 }
79
80 pub fn fingerprint(&self) -> String {
82 fn hash_option_list(hasher: &mut Sha256, values: &Option<Vec<String>>) {
83 match values {
84 Some(items) => {
85 let mut sorted = items.clone();
86 sorted.sort();
87 for item in sorted {
88 hasher.update(item.as_bytes());
89 hasher.update(b"|");
90 }
91 }
92 None => hasher.update(b"<none>"),
93 }
94 }
95
96 let mut hasher = Sha256::new();
97 hash_option_list(&mut hasher, &self.include_databases);
98 hasher.update(b"#");
99 hash_option_list(&mut hasher, &self.exclude_databases);
100 hasher.update(b"#");
101 hash_option_list(&mut hasher, &self.include_tables);
102 hasher.update(b"#");
103 hash_option_list(&mut hasher, &self.exclude_tables);
104 hasher.update(b"#");
105 hasher.update(self.table_rules.fingerprint().as_bytes());
106
107 format!("{:x}", hasher.finalize())
108 }
109
110 pub fn table_rules(&self) -> &TableRules {
111 &self.table_rules
112 }
113
114 pub fn with_table_rules(mut self, rules: TableRules) -> Self {
115 self.table_rules = rules;
116 self
117 }
118
119 pub fn schema_only_tables(&self, database: &str) -> Vec<String> {
120 self.table_rules.schema_only_tables(database)
121 }
122
123 pub fn predicate_tables(&self, database: &str) -> Vec<(String, String)> {
124 self.table_rules.predicate_tables(database)
125 }
126
127 pub fn include_databases(&self) -> Option<&Vec<String>> {
129 self.include_databases.as_ref()
130 }
131
132 pub fn exclude_databases(&self) -> Option<&Vec<String>> {
134 self.exclude_databases.as_ref()
135 }
136
137 pub fn include_tables(&self) -> Option<&Vec<String>> {
139 self.include_tables.as_ref()
140 }
141
142 pub fn exclude_tables(&self) -> Option<&Vec<String>> {
144 self.exclude_tables.as_ref()
145 }
146
147 pub fn should_replicate_database(&self, db_name: &str) -> bool {
149 if let Some(ref include) = self.include_databases {
151 if !include.contains(&db_name.to_string()) {
152 return false;
153 }
154 }
155
156 if let Some(ref exclude) = self.exclude_databases {
158 if exclude.contains(&db_name.to_string()) {
159 return false;
160 }
161 }
162
163 true
164 }
165
166 pub fn should_replicate_table(&self, db_name: &str, table_name: &str) -> bool {
168 let full_name = format!("{}.{}", db_name, table_name);
169
170 if let Some(ref include) = self.include_tables {
172 if !include.contains(&full_name) {
173 return false;
174 }
175 }
176
177 if let Some(ref exclude) = self.exclude_tables {
179 if exclude.contains(&full_name) {
180 return false;
181 }
182 }
183
184 true
185 }
186
187 pub async fn get_databases_to_replicate(&self, source_conn: &Client) -> Result<Vec<String>> {
189 let all_databases = crate::migration::schema::list_databases(source_conn).await?;
191
192 let filtered: Vec<String> = all_databases
194 .into_iter()
195 .filter(|db| self.should_replicate_database(&db.name))
196 .map(|db| db.name)
197 .collect();
198
199 if filtered.is_empty() {
200 bail!("No databases selected for replication. Check your filters.");
201 }
202
203 Ok(filtered)
204 }
205
206 pub async fn get_tables_to_replicate(
208 &self,
209 source_conn: &Client,
210 db_name: &str,
211 ) -> Result<Vec<String>> {
212 let all_tables = crate::migration::schema::list_tables(source_conn).await?;
214
215 let filtered: Vec<String> = all_tables
217 .into_iter()
218 .filter(|table| self.should_replicate_table(db_name, &table.name))
219 .map(|table| table.name)
220 .collect();
221
222 Ok(filtered)
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 #[test]
231 fn test_new_validates_mutually_exclusive_database_flags() {
232 let result = ReplicationFilter::new(
233 Some(vec!["db1".to_string()]),
234 Some(vec!["db2".to_string()]),
235 None,
236 None,
237 );
238 assert!(result.is_err());
239 assert!(result
240 .unwrap_err()
241 .to_string()
242 .contains("Cannot use both --include-databases and --exclude-databases"));
243 }
244
245 #[test]
246 fn test_new_validates_mutually_exclusive_table_flags() {
247 let result = ReplicationFilter::new(
248 None,
249 None,
250 Some(vec!["db1.table1".to_string()]),
251 Some(vec!["db2.table2".to_string()]),
252 );
253 assert!(result.is_err());
254 assert!(result
255 .unwrap_err()
256 .to_string()
257 .contains("Cannot use both --include-tables and --exclude-tables"));
258 }
259
260 #[test]
261 fn test_new_validates_table_format_for_include() {
262 let result =
263 ReplicationFilter::new(None, None, Some(vec!["invalid_table".to_string()]), None);
264 assert!(result.is_err());
265 assert!(result
266 .unwrap_err()
267 .to_string()
268 .contains("Table must be specified as 'database.table'"));
269 }
270
271 #[test]
272 fn test_new_validates_table_format_for_exclude() {
273 let result =
274 ReplicationFilter::new(None, None, None, Some(vec!["invalid_table".to_string()]));
275 assert!(result.is_err());
276 assert!(result
277 .unwrap_err()
278 .to_string()
279 .contains("Table must be specified as 'database.table'"));
280 }
281
282 #[test]
283 fn test_should_replicate_database_with_include_list() {
284 let filter = ReplicationFilter::new(
285 Some(vec!["db1".to_string(), "db2".to_string()]),
286 None,
287 None,
288 None,
289 )
290 .unwrap();
291
292 assert!(filter.should_replicate_database("db1"));
293 assert!(filter.should_replicate_database("db2"));
294 assert!(!filter.should_replicate_database("db3"));
295 }
296
297 #[test]
298 fn test_should_replicate_database_with_exclude_list() {
299 let filter = ReplicationFilter::new(
300 None,
301 Some(vec!["test".to_string(), "dev".to_string()]),
302 None,
303 None,
304 )
305 .unwrap();
306
307 assert!(filter.should_replicate_database("production"));
308 assert!(!filter.should_replicate_database("test"));
309 assert!(!filter.should_replicate_database("dev"));
310 }
311
312 #[test]
313 fn test_should_replicate_table_with_include_list() {
314 let filter = ReplicationFilter::new(
315 None,
316 None,
317 Some(vec!["db1.users".to_string(), "db1.orders".to_string()]),
318 None,
319 )
320 .unwrap();
321
322 assert!(filter.should_replicate_table("db1", "users"));
323 assert!(filter.should_replicate_table("db1", "orders"));
324 assert!(!filter.should_replicate_table("db1", "logs"));
325 }
326
327 #[test]
328 fn test_should_replicate_table_with_exclude_list() {
329 let filter = ReplicationFilter::new(
330 None,
331 None,
332 None,
333 Some(vec![
334 "db1.audit_logs".to_string(),
335 "db1.temp_data".to_string(),
336 ]),
337 )
338 .unwrap();
339
340 assert!(filter.should_replicate_table("db1", "users"));
341 assert!(!filter.should_replicate_table("db1", "audit_logs"));
342 assert!(!filter.should_replicate_table("db1", "temp_data"));
343 }
344
345 #[test]
346 fn test_empty_filter_replicates_everything() {
347 let filter = ReplicationFilter::empty();
348
349 assert!(filter.is_empty());
350 assert!(filter.should_replicate_database("any_db"));
351 assert!(filter.should_replicate_table("any_db", "any_table"));
352 }
353
354 #[test]
355 fn test_is_empty_returns_false_when_include_databases_set() {
356 let filter =
357 ReplicationFilter::new(Some(vec!["db1".to_string()]), None, None, None).unwrap();
358 assert!(!filter.is_empty());
359 }
360
361 #[test]
362 fn test_is_empty_returns_false_when_exclude_databases_set() {
363 let filter =
364 ReplicationFilter::new(None, Some(vec!["db1".to_string()]), None, None).unwrap();
365 assert!(!filter.is_empty());
366 }
367
368 #[test]
369 fn test_is_empty_returns_false_when_include_tables_set() {
370 let filter =
371 ReplicationFilter::new(None, None, Some(vec!["db1.table1".to_string()]), None).unwrap();
372 assert!(!filter.is_empty());
373 }
374
375 #[test]
376 fn test_is_empty_returns_false_when_exclude_tables_set() {
377 let filter =
378 ReplicationFilter::new(None, None, None, Some(vec!["db1.table1".to_string()])).unwrap();
379 assert!(!filter.is_empty());
380 }
381
382 #[test]
383 fn test_fingerprint_is_order_insensitive() {
384 let filter_a = ReplicationFilter::new(
385 Some(vec!["db1".to_string(), "db2".to_string()]),
386 None,
387 None,
388 None,
389 )
390 .unwrap();
391 let filter_b = ReplicationFilter::new(
392 Some(vec!["db2".to_string(), "db1".to_string()]),
393 None,
394 None,
395 None,
396 )
397 .unwrap();
398
399 assert_eq!(filter_a.fingerprint(), filter_b.fingerprint());
400 }
401
402 #[test]
403 fn test_fingerprint_differs_for_different_filters() {
404 let filter_a =
405 ReplicationFilter::new(None, Some(vec!["db1".to_string()]), None, None).unwrap();
406 let filter_b =
407 ReplicationFilter::new(None, Some(vec!["db2".to_string()]), None, None).unwrap();
408
409 assert_ne!(filter_a.fingerprint(), filter_b.fingerprint());
410 }
411
412 #[test]
413 fn test_fingerprint_includes_table_rules_schema() {
414 use crate::table_rules::TableRules;
415
416 let mut table_rules_a = TableRules::default();
418 table_rules_a
419 .apply_schema_only_cli(&["public.orders".to_string()])
420 .unwrap();
421
422 let mut table_rules_b = TableRules::default();
423 table_rules_b
424 .apply_schema_only_cli(&["analytics.orders".to_string()])
425 .unwrap();
426
427 let filter_a = ReplicationFilter::empty().with_table_rules(table_rules_a);
428 let filter_b = ReplicationFilter::empty().with_table_rules(table_rules_b);
429
430 assert_ne!(
431 filter_a.fingerprint(),
432 filter_b.fingerprint(),
433 "Filters with different table rule schemas should produce different fingerprints"
434 );
435 }
436}