database_replicator/migration/
dump.rs

1// ABOUTME: Wrapper for pg_dump command to export database objects
2// ABOUTME: Handles global objects, schema, and data export
3
4use crate::filters::ReplicationFilter;
5use anyhow::{Context, Result};
6use std::collections::BTreeSet;
7use std::fs;
8use std::process::{Command, Stdio};
9use std::time::Duration;
10
11/// Dump global objects (roles, tablespaces) using pg_dumpall
12pub async fn dump_globals(source_url: &str, output_path: &str) -> Result<()> {
13    tracing::info!("Dumping global objects to {}", output_path);
14
15    // Parse URL and create .pgpass file for secure authentication
16    let parts = crate::utils::parse_postgres_url(source_url)
17        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
18    let pgpass = crate::utils::PgPassFile::new(&parts)
19        .context("Failed to create .pgpass file for authentication")?;
20
21    let env_vars = parts.to_pg_env_vars();
22    let output_path_owned = output_path.to_string();
23
24    // Wrap subprocess execution with retry logic
25    crate::utils::retry_subprocess_with_backoff(
26        || {
27            let mut cmd = Command::new("pg_dumpall");
28            cmd.arg("--globals-only")
29                .arg("--no-role-passwords") // Don't dump passwords
30                .arg("--verbose") // Show progress
31                .arg("--host")
32                .arg(&parts.host)
33                .arg("--port")
34                .arg(parts.port.to_string())
35                .arg("--database")
36                .arg(&parts.database)
37                .arg(format!("--file={}", output_path_owned))
38                .env("PGPASSFILE", pgpass.path())
39                .stdout(Stdio::inherit())
40                .stderr(Stdio::inherit());
41
42            // Add username if specified
43            if let Some(user) = &parts.user {
44                cmd.arg("--username").arg(user);
45            }
46
47            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
48            for (env_var, value) in &env_vars {
49                cmd.env(env_var, value);
50            }
51
52            // Apply TCP keepalive parameters to prevent idle connection timeouts
53            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
54                cmd.env(env_var, value);
55            }
56
57            cmd.status().context(
58                "Failed to execute pg_dumpall. Is PostgreSQL client installed?\n\
59                 Install with:\n\
60                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
61                 - macOS: brew install postgresql\n\
62                 - RHEL/CentOS: sudo yum install postgresql",
63            )
64        },
65        3,                      // Max 3 retries
66        Duration::from_secs(1), // Start with 1 second delay
67        "pg_dumpall (dump globals)",
68    )
69    .await
70    .context(
71        "pg_dumpall failed to dump global objects.\n\
72         \n\
73         Common causes:\n\
74         - Connection authentication failed\n\
75         - User lacks sufficient privileges (need SUPERUSER or pg_read_all_settings role)\n\
76         - Network connectivity issues\n\
77         - Invalid connection string\n\
78         - Connection timeout or network issues",
79    )?;
80
81    tracing::info!("✓ Global objects dumped successfully");
82    Ok(())
83}
84
85/// Update a globals dump so duplicate role creation errors become harmless notices.
86///
87/// `pg_dumpall --globals-only` emits `CREATE ROLE` statements that fail if the
88/// role already exists on the target cluster. When an operator reruns
89/// replication against the same target, those statements cause `psql` to exit
90/// with status 3 which previously triggered noisy retries and prevented the rest
91/// of the globals from being applied. By wrapping each `CREATE ROLE` statement
92/// in a `DO $$ ... EXCEPTION WHEN duplicate_object` block, we allow Postgres to
93/// skip recreating existing roles while still applying subsequent `ALTER ROLE`
94/// and `GRANT` statements.
95pub fn sanitize_globals_dump(path: &str) -> Result<()> {
96    let content = fs::read_to_string(path)
97        .with_context(|| format!("Failed to read globals dump at {}", path))?;
98
99    if let Some(updated) = rewrite_create_role_statements(&content) {
100        fs::write(path, updated)
101            .with_context(|| format!("Failed to update globals dump at {}", path))?;
102    }
103
104    Ok(())
105}
106
107/// Comments out `ALTER ROLE ... SUPERUSER` statements in a globals dump file.
108///
109/// Managed Postgres services (e.g., AWS RDS) often prevent the restore user
110/// from granting SUPERUSER. Commenting out those lines keeps the restore
111/// moving without permission errors.
112pub fn remove_superuser_from_globals(path: &str) -> Result<()> {
113    let content = fs::read_to_string(path)
114        .with_context(|| format!("Failed to read globals dump at {}", path))?;
115
116    let mut updated = String::with_capacity(content.len());
117    let mut modified = false;
118    for line in content.lines() {
119        if line.contains("ALTER ROLE") && line.contains("SUPERUSER") {
120            updated.push_str("-- ");
121            updated.push_str(line);
122            updated.push('\n');
123            modified = true;
124        } else {
125            updated.push_str(line);
126            updated.push('\n');
127        }
128    }
129
130    if modified {
131        fs::write(path, updated)
132            .with_context(|| format!("Failed to write sanitized globals dump to {}", path))?;
133    }
134
135    Ok(())
136}
137
138/// Removes parameter settings that require superuser privileges (e.g. `log_statement`).
139///
140/// AWS RDS prevents standard replication roles from altering certain GUCs via
141/// `ALTER ROLE ... SET`. Each offending line is commented out so `psql` skips
142/// them without aborting the rest of the globals restore.
143pub fn remove_restricted_guc_settings(path: &str) -> Result<()> {
144    const BLOCKED_PATTERNS: &[&str] = &["set log_statement"];
145
146    let content = fs::read_to_string(path)
147        .with_context(|| format!("Failed to read globals dump at {}", path))?;
148
149    let mut updated = String::with_capacity(content.len());
150    let mut modified = false;
151
152    for line in content.lines() {
153        let trimmed = line.trim_start();
154        if trimmed.starts_with("--") {
155            updated.push_str(line);
156            updated.push('\n');
157            continue;
158        }
159
160        let lower = line.to_ascii_lowercase();
161        let is_blocked = BLOCKED_PATTERNS
162            .iter()
163            .any(|pattern| lower.contains(pattern));
164
165        if is_blocked {
166            updated.push_str("-- ");
167            updated.push_str(line);
168            updated.push('\n');
169            modified = true;
170        } else {
171            updated.push_str(line);
172            updated.push('\n');
173        }
174    }
175
176    if modified {
177        fs::write(path, updated)
178            .with_context(|| format!("Failed to write sanitized globals dump to {}", path))?;
179    }
180
181    Ok(())
182}
183
184fn rewrite_create_role_statements(sql: &str) -> Option<String> {
185    if sql.is_empty() {
186        return None;
187    }
188
189    let mut output = String::with_capacity(sql.len() + 1024);
190    let mut modified = false;
191    let mut cursor = 0;
192
193    while cursor < sql.len() {
194        if let Some(rel_pos) = sql[cursor..].find('\n') {
195            let end = cursor + rel_pos + 1;
196            let chunk = &sql[cursor..end];
197            if let Some(transformed) = wrap_create_role_line(chunk) {
198                output.push_str(&transformed);
199                modified = true;
200            } else {
201                output.push_str(chunk);
202            }
203            cursor = end;
204        } else {
205            let chunk = &sql[cursor..];
206            if let Some(transformed) = wrap_create_role_line(chunk) {
207                output.push_str(&transformed);
208                modified = true;
209            } else {
210                output.push_str(chunk);
211            }
212            break;
213        }
214    }
215
216    if modified {
217        Some(output)
218    } else {
219        None
220    }
221}
222
223fn wrap_create_role_line(chunk: &str) -> Option<String> {
224    let trimmed = chunk.trim_start();
225    if !trimmed.starts_with("CREATE ROLE ") {
226        return None;
227    }
228
229    let statement = trimmed.trim_end();
230    let statement_body = statement.trim_end_matches(';').trim_end();
231    let leading_ws_len = chunk.len() - trimmed.len();
232    let leading_ws = &chunk[..leading_ws_len];
233    let newline = if chunk.ends_with("\r\n") {
234        "\r\n"
235    } else if chunk.ends_with('\n') {
236        "\n"
237    } else {
238        ""
239    };
240
241    let role_token = extract_role_token(statement_body)?;
242
243    let notice_name = escape_single_quotes(&unquote_role_name(&role_token));
244
245    let mut block = String::with_capacity(chunk.len() + 128);
246    block.push_str(leading_ws);
247    block.push_str("DO $$\n");
248    block.push_str(leading_ws);
249    block.push_str("BEGIN\n");
250    block.push_str(leading_ws);
251    block.push_str("    ");
252    block.push_str(statement_body);
253    block.push_str(";\n");
254    block.push_str(leading_ws);
255    block.push_str("EXCEPTION\n");
256    block.push_str(leading_ws);
257    block.push_str("    WHEN duplicate_object THEN\n");
258    block.push_str(leading_ws);
259    block.push_str("        RAISE NOTICE 'Role ");
260    block.push_str(&notice_name);
261    block.push_str(" already exists on target, skipping CREATE ROLE';\n");
262    block.push_str(leading_ws);
263    block.push_str("END $$;");
264
265    if !newline.is_empty() {
266        block.push_str(newline);
267    }
268
269    Some(block)
270}
271
272fn extract_role_token(statement: &str) -> Option<String> {
273    let remainder = statement.strip_prefix("CREATE ROLE")?.trim_start();
274
275    if remainder.starts_with('"') {
276        let mut idx = 1;
277        let bytes = remainder.as_bytes();
278        while idx < bytes.len() {
279            if bytes[idx] == b'"' {
280                if idx + 1 < bytes.len() && bytes[idx + 1] == b'"' {
281                    idx += 2;
282                    continue;
283                } else {
284                    idx += 1;
285                    break;
286                }
287            }
288            idx += 1;
289        }
290        if idx <= remainder.len() {
291            return Some(remainder[..idx].to_string());
292        }
293        None
294    } else {
295        let mut end = remainder.len();
296        for (i, ch) in remainder.char_indices() {
297            if ch.is_whitespace() || ch == ';' {
298                end = i;
299                break;
300            }
301        }
302        if end == 0 {
303            None
304        } else {
305            Some(remainder[..end].to_string())
306        }
307    }
308}
309
310fn unquote_role_name(token: &str) -> String {
311    if token.starts_with('"') && token.ends_with('"') && token.len() >= 2 {
312        let inner = &token[1..token.len() - 1];
313        inner.replace("\"\"", "\"")
314    } else {
315        token.to_string()
316    }
317}
318
319fn escape_single_quotes(value: &str) -> String {
320    value.replace('\'', "''")
321}
322
323/// Dump schema (DDL) for a specific database
324pub async fn dump_schema(
325    source_url: &str,
326    database: &str,
327    output_path: &str,
328    filter: &ReplicationFilter,
329) -> Result<()> {
330    tracing::info!(
331        "Dumping schema for database '{}' to {}",
332        database,
333        output_path
334    );
335
336    // Parse URL and create .pgpass file for secure authentication
337    let parts = crate::utils::parse_postgres_url(source_url)
338        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
339    let pgpass = crate::utils::PgPassFile::new(&parts)
340        .context("Failed to create .pgpass file for authentication")?;
341
342    let env_vars = parts.to_pg_env_vars();
343    let output_path_owned = output_path.to_string();
344
345    // Collect filter options
346    let exclude_tables = get_schema_excluded_tables_for_db(filter, database);
347    let include_tables = get_included_tables_for_db(filter, database);
348
349    // Wrap subprocess execution with retry logic
350    crate::utils::retry_subprocess_with_backoff(
351        || {
352            let mut cmd = Command::new("pg_dump");
353            cmd.arg("--schema-only")
354                .arg("--no-owner") // Don't include ownership commands
355                .arg("--no-privileges") // We'll handle privileges separately
356                .arg("--verbose"); // Show progress
357
358            // Add table filtering if specified
359            // Only exclude explicit exclude_tables from schema dump (NOT schema_only or predicate tables)
360            if let Some(ref exclude) = exclude_tables {
361                if !exclude.is_empty() {
362                    for table in exclude {
363                        cmd.arg("--exclude-table").arg(table);
364                    }
365                }
366            }
367
368            // If include_tables is specified, only dump those tables
369            if let Some(ref include) = include_tables {
370                if !include.is_empty() {
371                    for table in include {
372                        cmd.arg("--table").arg(table);
373                    }
374                }
375            }
376
377            cmd.arg("--host")
378                .arg(&parts.host)
379                .arg("--port")
380                .arg(parts.port.to_string())
381                .arg("--dbname")
382                .arg(&parts.database)
383                .arg(format!("--file={}", output_path_owned))
384                .env("PGPASSFILE", pgpass.path())
385                .stdout(Stdio::inherit())
386                .stderr(Stdio::inherit());
387
388            // Add username if specified
389            if let Some(user) = &parts.user {
390                cmd.arg("--username").arg(user);
391            }
392
393            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
394            for (env_var, value) in &env_vars {
395                cmd.env(env_var, value);
396            }
397
398            // Apply TCP keepalive parameters to prevent idle connection timeouts
399            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
400                cmd.env(env_var, value);
401            }
402
403            cmd.status().context(
404                "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
405                 Install with:\n\
406                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
407                 - macOS: brew install postgresql\n\
408                 - RHEL/CentOS: sudo yum install postgresql",
409            )
410        },
411        3,                      // Max 3 retries
412        Duration::from_secs(1), // Start with 1 second delay
413        "pg_dump (dump schema)",
414    )
415    .await
416    .with_context(|| {
417        format!(
418            "pg_dump failed to dump schema for database '{}'.\n\
419             \n\
420             Common causes:\n\
421             - Database does not exist\n\
422             - Connection authentication failed\n\
423             - User lacks privileges to read database schema\n\
424             - Network connectivity issues\n\
425             - Connection timeout or network issues",
426            database
427        )
428    })?;
429
430    tracing::info!("✓ Schema dumped successfully");
431    Ok(())
432}
433
434/// Dump data for a specific database using optimized directory format
435///
436/// Uses PostgreSQL directory format dump with:
437/// - Parallel dumps for faster performance
438/// - Maximum compression (level 9)
439/// - Large object (blob) support
440/// - Directory output for efficient parallel restore
441///
442/// The number of parallel jobs is automatically determined based on available CPU cores.
443pub async fn dump_data(
444    source_url: &str,
445    database: &str,
446    output_path: &str,
447    filter: &ReplicationFilter,
448) -> Result<()> {
449    // Determine optimal number of parallel jobs (number of CPUs, capped at 8)
450    let num_cpus = std::thread::available_parallelism()
451        .map(|n| n.get().min(8))
452        .unwrap_or(4);
453
454    tracing::info!(
455        "Dumping data for database '{}' to {} (parallel={}, compression=9, format=directory)",
456        database,
457        output_path,
458        num_cpus
459    );
460
461    // Parse URL and create .pgpass file for secure authentication
462    let parts = crate::utils::parse_postgres_url(source_url)
463        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
464    let pgpass = crate::utils::PgPassFile::new(&parts)
465        .context("Failed to create .pgpass file for authentication")?;
466
467    let env_vars = parts.to_pg_env_vars();
468    let output_path_owned = output_path.to_string();
469
470    // Collect filter options
471    let exclude_tables = get_data_excluded_tables_for_db(filter, database);
472    let include_tables = get_included_tables_for_db(filter, database);
473
474    // Wrap subprocess execution with retry logic
475    crate::utils::retry_subprocess_with_backoff(
476        || {
477            let mut cmd = Command::new("pg_dump");
478            cmd.arg("--data-only")
479                .arg("--no-owner")
480                .arg("--format=directory") // Directory format enables parallel operations
481                .arg("--blobs") // Include large objects (blobs)
482                .arg("--compress=9") // Maximum compression for smaller dump size
483                .arg(format!("--jobs={}", num_cpus)) // Parallel dump jobs
484                .arg("--verbose"); // Show progress
485
486            // Add table filtering if specified
487            // Exclude explicit excludes, schema_only tables, and predicate tables from data dump
488            if let Some(ref exclude) = exclude_tables {
489                if !exclude.is_empty() {
490                    for table in exclude {
491                        cmd.arg("--exclude-table-data").arg(table);
492                    }
493                }
494            }
495
496            // If include_tables is specified, only dump data for those tables
497            if let Some(ref include) = include_tables {
498                if !include.is_empty() {
499                    for table in include {
500                        cmd.arg("--table").arg(table);
501                    }
502                }
503            }
504
505            cmd.arg("--host")
506                .arg(&parts.host)
507                .arg("--port")
508                .arg(parts.port.to_string())
509                .arg("--dbname")
510                .arg(&parts.database)
511                .arg(format!("--file={}", output_path_owned))
512                .env("PGPASSFILE", pgpass.path())
513                .stdout(Stdio::inherit())
514                .stderr(Stdio::inherit());
515
516            // Add username if specified
517            if let Some(user) = &parts.user {
518                cmd.arg("--username").arg(user);
519            }
520
521            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
522            for (env_var, value) in &env_vars {
523                cmd.env(env_var, value);
524            }
525
526            // Apply TCP keepalive parameters to prevent idle connection timeouts
527            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
528                cmd.env(env_var, value);
529            }
530
531            cmd.status().context(
532                "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
533                 Install with:\n\
534                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
535                 - macOS: brew install postgresql\n\
536                 - RHEL/CentOS: sudo yum install postgresql",
537            )
538        },
539        3,                      // Max 3 retries
540        Duration::from_secs(1), // Start with 1 second delay
541        "pg_dump (dump data)",
542    )
543    .await
544    .with_context(|| {
545        format!(
546            "pg_dump failed to dump data for database '{}'.\n\
547             \n\
548             Common causes:\n\
549             - Database does not exist\n\
550             - Connection authentication failed\n\
551             - User lacks privileges to read table data\n\
552             - Network connectivity issues\n\
553             - Insufficient disk space for dump directory\n\
554             - Output directory already exists (pg_dump requires non-existent path)\n\
555             - Connection timeout or network issues",
556            database
557        )
558    })?;
559
560    tracing::info!(
561        "✓ Data dumped successfully using {} parallel jobs",
562        num_cpus
563    );
564    Ok(())
565}
566
567/// Extract table names to exclude from SCHEMA dumps (--exclude-table flag)
568/// Only excludes explicit exclude_tables - NOT schema_only or predicate tables
569/// (those need their schema created, just not bulk data copied)
570/// Returns schema-qualified names in format: "schema"."table"
571fn get_schema_excluded_tables_for_db(
572    filter: &ReplicationFilter,
573    db_name: &str,
574) -> Option<Vec<String>> {
575    let mut tables = BTreeSet::new();
576
577    // Handle explicit exclude_tables (format: "database.table")
578    // These tables are completely excluded (no schema, no data)
579    if let Some(explicit) = filter.exclude_tables() {
580        for full_name in explicit {
581            let parts: Vec<&str> = full_name.split('.').collect();
582            if parts.len() == 2 && parts[0] == db_name {
583                // Format as "public"."table" for consistency
584                tables.insert(format!("\"public\".\"{}\"", parts[1]));
585            }
586        }
587    }
588
589    if tables.is_empty() {
590        None
591    } else {
592        Some(tables.into_iter().collect())
593    }
594}
595
596/// Extract table names to exclude from DATA dumps (--exclude-table-data flag)
597/// Excludes explicit excludes, schema_only tables, and predicate tables
598/// (predicate tables will be copied separately with filtering)
599/// Returns schema-qualified names in format: "schema"."table"
600fn get_data_excluded_tables_for_db(
601    filter: &ReplicationFilter,
602    db_name: &str,
603) -> Option<Vec<String>> {
604    let mut tables = BTreeSet::new();
605
606    // Handle explicit exclude_tables (format: "database.table")
607    // Default to public schema for backward compatibility
608    if let Some(explicit) = filter.exclude_tables() {
609        for full_name in explicit {
610            let parts: Vec<&str> = full_name.split('.').collect();
611            if parts.len() == 2 && parts[0] == db_name {
612                // Format as "public"."table" for consistency
613                tables.insert(format!("\"public\".\"{}\"", parts[1]));
614            }
615        }
616    }
617
618    // schema_only_tables and predicate_tables already return schema-qualified names
619    for table in filter.schema_only_tables(db_name) {
620        tables.insert(table);
621    }
622
623    for (table, _) in filter.predicate_tables(db_name) {
624        tables.insert(table);
625    }
626
627    if tables.is_empty() {
628        None
629    } else {
630        Some(tables.into_iter().collect())
631    }
632}
633
634/// Extract table names for a specific database from include_tables filter
635/// Returns schema-qualified names in format: "schema"."table"
636fn get_included_tables_for_db(filter: &ReplicationFilter, db_name: &str) -> Option<Vec<String>> {
637    filter.include_tables().map(|tables| {
638        tables
639            .iter()
640            .filter_map(|full_name| {
641                let parts: Vec<&str> = full_name.split('.').collect();
642                if parts.len() == 2 && parts[0] == db_name {
643                    // Format as "public"."table" for consistency
644                    Some(format!("\"public\".\"{}\"", parts[1]))
645                } else {
646                    None
647                }
648            })
649            .collect()
650    })
651}
652
653#[cfg(test)]
654mod tests {
655    use super::*;
656    use tempfile::tempdir;
657
658    #[tokio::test]
659    #[ignore]
660    async fn test_dump_globals() {
661        let url = std::env::var("TEST_SOURCE_URL").unwrap();
662        let dir = tempdir().unwrap();
663        let output = dir.path().join("globals.sql");
664
665        let result = dump_globals(&url, output.to_str().unwrap()).await;
666
667        assert!(result.is_ok());
668        assert!(output.exists());
669
670        // Verify file contains SQL
671        let content = std::fs::read_to_string(&output).unwrap();
672        assert!(content.contains("CREATE ROLE") || !content.is_empty());
673    }
674
675    #[tokio::test]
676    #[ignore]
677    async fn test_dump_schema() {
678        let url = std::env::var("TEST_SOURCE_URL").unwrap();
679        let dir = tempdir().unwrap();
680        let output = dir.path().join("schema.sql");
681
682        // Extract database name from URL
683        let db = url.split('/').next_back().unwrap_or("postgres");
684
685        let filter = crate::filters::ReplicationFilter::empty();
686        let result = dump_schema(&url, db, output.to_str().unwrap(), &filter).await;
687
688        assert!(result.is_ok());
689        assert!(output.exists());
690    }
691
692    #[test]
693    fn test_get_schema_excluded_tables_for_db() {
694        let filter = crate::filters::ReplicationFilter::new(
695            None,
696            None,
697            None,
698            Some(vec![
699                "db1.table1".to_string(),
700                "db1.table2".to_string(),
701                "db2.table3".to_string(),
702            ]),
703        )
704        .unwrap();
705
706        // Schema exclusion only includes explicit exclude_tables
707        let tables = get_schema_excluded_tables_for_db(&filter, "db1").unwrap();
708        // Should return schema-qualified names
709        assert_eq!(
710            tables,
711            vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
712        );
713
714        let tables = get_schema_excluded_tables_for_db(&filter, "db2").unwrap();
715        assert_eq!(tables, vec!["\"public\".\"table3\""]);
716
717        let tables = get_schema_excluded_tables_for_db(&filter, "db3");
718        assert!(tables.is_none() || tables.unwrap().is_empty());
719    }
720
721    #[test]
722    fn test_get_data_excluded_tables_for_db() {
723        let filter = crate::filters::ReplicationFilter::new(
724            None,
725            None,
726            None,
727            Some(vec![
728                "db1.table1".to_string(),
729                "db1.table2".to_string(),
730                "db2.table3".to_string(),
731            ]),
732        )
733        .unwrap();
734
735        // Data exclusion includes explicit exclude_tables, schema_only, and predicate tables
736        let tables = get_data_excluded_tables_for_db(&filter, "db1").unwrap();
737        // Should return schema-qualified names
738        assert_eq!(
739            tables,
740            vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
741        );
742
743        let tables = get_data_excluded_tables_for_db(&filter, "db2").unwrap();
744        assert_eq!(tables, vec!["\"public\".\"table3\""]);
745
746        let tables = get_data_excluded_tables_for_db(&filter, "db3");
747        assert!(tables.is_none() || tables.unwrap().is_empty());
748    }
749
750    #[test]
751    fn test_get_included_tables_for_db() {
752        let filter = crate::filters::ReplicationFilter::new(
753            None,
754            None,
755            Some(vec![
756                "db1.users".to_string(),
757                "db1.orders".to_string(),
758                "db2.products".to_string(),
759            ]),
760            None,
761        )
762        .unwrap();
763
764        let tables = get_included_tables_for_db(&filter, "db1").unwrap();
765        // Should return schema-qualified names in original order
766        assert_eq!(
767            tables,
768            vec!["\"public\".\"users\"", "\"public\".\"orders\""]
769        );
770
771        let tables = get_included_tables_for_db(&filter, "db2").unwrap();
772        assert_eq!(tables, vec!["\"public\".\"products\""]);
773
774        let tables = get_included_tables_for_db(&filter, "db3");
775        assert!(tables.is_none() || tables.unwrap().is_empty());
776    }
777
778    #[test]
779    fn test_get_schema_excluded_tables_for_db_with_empty_filter() {
780        let filter = crate::filters::ReplicationFilter::empty();
781        let tables = get_schema_excluded_tables_for_db(&filter, "db1");
782        assert!(tables.is_none());
783    }
784
785    #[test]
786    fn test_get_data_excluded_tables_for_db_with_empty_filter() {
787        let filter = crate::filters::ReplicationFilter::empty();
788        let tables = get_data_excluded_tables_for_db(&filter, "db1");
789        assert!(tables.is_none());
790    }
791
792    #[test]
793    fn test_get_included_tables_for_db_with_empty_filter() {
794        let filter = crate::filters::ReplicationFilter::empty();
795        let tables = get_included_tables_for_db(&filter, "db1");
796        assert!(tables.is_none());
797    }
798
799    #[test]
800    fn test_rewrite_create_role_statements_wraps_unquoted_role() {
801        let sql = "CREATE ROLE replicator WITH LOGIN;\nALTER ROLE replicator WITH LOGIN;\n";
802        let rewritten = rewrite_create_role_statements(sql).expect("rewrite expected");
803
804        assert!(rewritten.contains("DO $$"));
805        assert!(rewritten.contains("Role replicator already exists"));
806        assert!(rewritten.contains("CREATE ROLE replicator WITH LOGIN;"));
807        assert!(rewritten.contains("ALTER ROLE replicator WITH LOGIN;"));
808    }
809
810    #[test]
811    fn test_rewrite_create_role_statements_wraps_quoted_role() {
812        let sql = "    CREATE ROLE \"Andre Admin\";\n";
813        let rewritten = rewrite_create_role_statements(sql).expect("rewrite expected");
814
815        assert!(rewritten.contains("DO $$"));
816        assert!(rewritten.contains("Andre Admin already exists"));
817        assert!(rewritten.contains("CREATE ROLE \"Andre Admin\""));
818        assert!(rewritten.starts_with("    DO $$"));
819    }
820
821    #[test]
822    fn test_rewrite_create_role_statements_noop_when_absent() {
823        let sql = "ALTER ROLE existing WITH LOGIN;\n";
824        assert!(rewrite_create_role_statements(sql).is_none());
825    }
826}