database_replicator/migration/
dump.rs

1// ABOUTME: Wrapper for pg_dump command to export database objects
2// ABOUTME: Handles global objects, schema, and data export
3
4use crate::filters::ReplicationFilter;
5use anyhow::{Context, Result};
6use std::collections::BTreeSet;
7use std::fs;
8use std::process::{Command, Stdio};
9use std::time::Duration;
10
11/// Dump global objects (roles, tablespaces) using pg_dumpall
12pub async fn dump_globals(source_url: &str, output_path: &str) -> Result<()> {
13    tracing::info!("Dumping global objects to {}", output_path);
14
15    // Parse URL and create .pgpass file for secure authentication
16    let parts = crate::utils::parse_postgres_url(source_url)
17        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
18    let pgpass = crate::utils::PgPassFile::new(&parts)
19        .context("Failed to create .pgpass file for authentication")?;
20
21    let env_vars = parts.to_pg_env_vars();
22    let output_path_owned = output_path.to_string();
23
24    // Wrap subprocess execution with retry logic
25    crate::utils::retry_subprocess_with_backoff(
26        || {
27            let mut cmd = Command::new("pg_dumpall");
28            cmd.arg("--globals-only")
29                .arg("--no-role-passwords") // Don't dump passwords
30                .arg("--verbose") // Show progress
31                .arg("--host")
32                .arg(&parts.host)
33                .arg("--port")
34                .arg(parts.port.to_string())
35                .arg("--database")
36                .arg(&parts.database)
37                .arg(format!("--file={}", output_path_owned))
38                .env("PGPASSFILE", pgpass.path())
39                .stdout(Stdio::inherit())
40                .stderr(Stdio::inherit());
41
42            // Add username if specified
43            if let Some(user) = &parts.user {
44                cmd.arg("--username").arg(user);
45            }
46
47            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
48            for (env_var, value) in &env_vars {
49                cmd.env(env_var, value);
50            }
51
52            // Apply TCP keepalive parameters to prevent idle connection timeouts
53            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
54                cmd.env(env_var, value);
55            }
56
57            cmd.status().context(
58                "Failed to execute pg_dumpall. Is PostgreSQL client installed?\n\
59                 Install with:\n\
60                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
61                 - macOS: brew install postgresql\n\
62                 - RHEL/CentOS: sudo yum install postgresql",
63            )
64        },
65        3,                      // Max 3 retries
66        Duration::from_secs(1), // Start with 1 second delay
67        "pg_dumpall (dump globals)",
68    )
69    .await
70    .context(
71        "pg_dumpall failed to dump global objects.\n\
72         \n\
73         Common causes:\n\
74         - Connection authentication failed\n\
75         - User lacks sufficient privileges (need SUPERUSER or pg_read_all_settings role)\n\
76         - Network connectivity issues\n\
77         - Invalid connection string\n\
78         - Connection timeout or network issues",
79    )?;
80
81    tracing::info!("✓ Global objects dumped successfully");
82    Ok(())
83}
84
85/// Update a globals dump so duplicate role creation errors become harmless notices.
86///
87/// `pg_dumpall --globals-only` emits `CREATE ROLE` statements that fail if the
88/// role already exists on the target cluster. When an operator reruns
89/// replication against the same target, those statements cause `psql` to exit
90/// with status 3 which previously triggered noisy retries and prevented the rest
91/// of the globals from being applied. By wrapping each `CREATE ROLE` statement
92/// in a `DO $$ ... EXCEPTION WHEN duplicate_object` block, we allow Postgres to
93/// skip recreating existing roles while still applying subsequent `ALTER ROLE`
94/// and `GRANT` statements.
95pub fn sanitize_globals_dump(path: &str) -> Result<()> {
96    let content = fs::read_to_string(path)
97        .with_context(|| format!("Failed to read globals dump at {}", path))?;
98
99    if let Some(updated) = rewrite_create_role_statements(&content) {
100        fs::write(path, updated)
101            .with_context(|| format!("Failed to update globals dump at {}", path))?;
102    }
103
104    Ok(())
105}
106
107/// Comments out `ALTER ROLE ... SUPERUSER` statements in a globals dump file.
108///
109/// Managed Postgres services (e.g., AWS RDS) often prevent the restore user
110/// from granting SUPERUSER. Commenting out those lines keeps the restore
111/// moving without permission errors.
112pub fn remove_superuser_from_globals(path: &str) -> Result<()> {
113    let content = fs::read_to_string(path)
114        .with_context(|| format!("Failed to read globals dump at {}", path))?;
115
116    let mut updated = String::with_capacity(content.len());
117    let mut modified = false;
118    for line in content.lines() {
119        if line.contains("ALTER ROLE") && line.contains("SUPERUSER") {
120            updated.push_str("-- ");
121            updated.push_str(line);
122            updated.push('\n');
123            modified = true;
124        } else {
125            updated.push_str(line);
126            updated.push('\n');
127        }
128    }
129
130    if modified {
131        fs::write(path, updated)
132            .with_context(|| format!("Failed to write sanitized globals dump to {}", path))?;
133    }
134
135    Ok(())
136}
137
138/// Removes parameter settings that require superuser privileges (e.g. `log_statement`).
139///
140/// AWS RDS prevents standard replication roles from altering certain GUCs via
141/// `ALTER ROLE ... SET`. Each offending line is commented out so `psql` skips
142/// them without aborting the rest of the globals restore.
143pub fn remove_restricted_guc_settings(path: &str) -> Result<()> {
144    let content = fs::read_to_string(path)
145        .with_context(|| format!("Failed to read globals dump at {}", path))?;
146
147    let mut updated = String::with_capacity(content.len());
148    let mut modified = false;
149
150    for line in content.lines() {
151        let lower_line = line.to_ascii_lowercase();
152        if lower_line.contains("alter role") && lower_line.contains("set") {
153            updated.push_str("-- ");
154            updated.push_str(line);
155            updated.push('\n');
156            modified = true;
157        } else {
158            updated.push_str(line);
159            updated.push('\n');
160        }
161    }
162
163    if modified {
164        fs::write(path, updated)
165            .with_context(|| format!("Failed to write sanitized globals dump to {}", path))?;
166    }
167
168    Ok(())
169}
170
171/// Comments out `GRANT` statements for roles that are restricted on managed services.
172///
173/// AWS RDS and other managed services may prevent granting certain default roles
174/// like `pg_checkpoint`. This function comments out those statements to allow
175/// the globals restore to proceed without permission errors.
176pub fn remove_restricted_role_grants(path: &str) -> Result<()> {
177    // Roles that cannot be granted on managed PostgreSQL services (AWS RDS, etc.)
178    const RESTRICTED_ROLES: &[&str] = &[
179        "pg_checkpoint",
180        "pg_read_all_data",
181        "pg_write_all_data",
182        "pg_read_all_settings",
183        "pg_read_all_stats",
184        "pg_stat_scan_tables",
185        "pg_monitor",
186        "pg_signal_backend",
187        "pg_read_server_files",
188        "pg_write_server_files",
189        "pg_execute_server_program",
190        "pg_create_subscription",
191        "pg_maintain",
192        "pg_use_reserved_connections",
193    ];
194
195    let content = fs::read_to_string(path)
196        .with_context(|| format!("Failed to read globals dump at {}", path))?;
197
198    let mut updated = String::with_capacity(content.len());
199    let mut modified = false;
200
201    for line in content.lines() {
202        let lower_trimmed = line.trim().to_ascii_lowercase();
203        if lower_trimmed.starts_with("grant ") {
204            let is_restricted = RESTRICTED_ROLES.iter().any(|role| {
205                // Get the role being granted (2nd word), stripping any quotes
206                // e.g. "grant pg_checkpoint to some_user" or "grant \"pg_checkpoint\" to some_user"
207                lower_trimmed
208                    .split_whitespace()
209                    .nth(1)
210                    .map(|r| r.trim_matches('"') == *role)
211                    .unwrap_or(false)
212            });
213
214            if is_restricted {
215                updated.push_str("-- ");
216                updated.push_str(line);
217                updated.push('\n');
218                modified = true;
219                continue;
220            }
221        }
222
223        updated.push_str(line);
224        updated.push('\n');
225    }
226
227    if modified {
228        fs::write(path, updated)
229            .with_context(|| format!("Failed to write sanitized globals dump to {}", path))?;
230    }
231
232    Ok(())
233}
234
235fn rewrite_create_role_statements(sql: &str) -> Option<String> {
236    if sql.is_empty() {
237        return None;
238    }
239
240    let mut output = String::with_capacity(sql.len() + 1024);
241    let mut modified = false;
242    let mut cursor = 0;
243
244    while cursor < sql.len() {
245        if let Some(rel_pos) = sql[cursor..].find('\n') {
246            let end = cursor + rel_pos + 1;
247            let chunk = &sql[cursor..end];
248            if let Some(transformed) = wrap_create_role_line(chunk) {
249                output.push_str(&transformed);
250                modified = true;
251            } else {
252                output.push_str(chunk);
253            }
254            cursor = end;
255        } else {
256            let chunk = &sql[cursor..];
257            if let Some(transformed) = wrap_create_role_line(chunk) {
258                output.push_str(&transformed);
259                modified = true;
260            } else {
261                output.push_str(chunk);
262            }
263            break;
264        }
265    }
266
267    if modified {
268        Some(output)
269    } else {
270        None
271    }
272}
273
274fn wrap_create_role_line(chunk: &str) -> Option<String> {
275    let trimmed = chunk.trim_start();
276    if !trimmed.starts_with("CREATE ROLE ") {
277        return None;
278    }
279
280    let statement = trimmed.trim_end();
281    let statement_body = statement.trim_end_matches(';').trim_end();
282    let leading_ws_len = chunk.len() - trimmed.len();
283    let leading_ws = &chunk[..leading_ws_len];
284    let newline = if chunk.ends_with("\r\n") {
285        "\r\n"
286    } else if chunk.ends_with('\n') {
287        "\n"
288    } else {
289        ""
290    };
291
292    let role_token = extract_role_token(statement_body)?;
293
294    let notice_name = escape_single_quotes(&unquote_role_name(&role_token));
295
296    let mut block = String::with_capacity(chunk.len() + 128);
297    block.push_str(leading_ws);
298    block.push_str("DO $$\n");
299    block.push_str(leading_ws);
300    block.push_str("BEGIN\n");
301    block.push_str(leading_ws);
302    block.push_str("    ");
303    block.push_str(statement_body);
304    block.push_str(";\n");
305    block.push_str(leading_ws);
306    block.push_str("EXCEPTION\n");
307    block.push_str(leading_ws);
308    block.push_str("    WHEN duplicate_object THEN\n");
309    block.push_str(leading_ws);
310    block.push_str("        RAISE NOTICE 'Role ");
311    block.push_str(&notice_name);
312    block.push_str(" already exists on target, skipping CREATE ROLE';\n");
313    block.push_str(leading_ws);
314    block.push_str("END $$;");
315
316    if !newline.is_empty() {
317        block.push_str(newline);
318    }
319
320    Some(block)
321}
322
323fn extract_role_token(statement: &str) -> Option<String> {
324    let remainder = statement.strip_prefix("CREATE ROLE")?.trim_start();
325
326    if remainder.starts_with('"') {
327        let mut idx = 1;
328        let bytes = remainder.as_bytes();
329        while idx < bytes.len() {
330            if bytes[idx] == b'"' {
331                if idx + 1 < bytes.len() && bytes[idx + 1] == b'"' {
332                    idx += 2;
333                    continue;
334                } else {
335                    idx += 1;
336                    break;
337                }
338            }
339            idx += 1;
340        }
341        if idx <= remainder.len() {
342            return Some(remainder[..idx].to_string());
343        }
344        None
345    } else {
346        let mut end = remainder.len();
347        for (i, ch) in remainder.char_indices() {
348            if ch.is_whitespace() || ch == ';' {
349                end = i;
350                break;
351            }
352        }
353        if end == 0 {
354            None
355        } else {
356            Some(remainder[..end].to_string())
357        }
358    }
359}
360
361fn unquote_role_name(token: &str) -> String {
362    if token.starts_with('"') && token.ends_with('"') && token.len() >= 2 {
363        let inner = &token[1..token.len() - 1];
364        inner.replace("\"\"", "\"")
365    } else {
366        token.to_string()
367    }
368}
369
370fn escape_single_quotes(value: &str) -> String {
371    value.replace('\'', "''")
372}
373
374/// Dump schema (DDL) for a specific database
375pub async fn dump_schema(
376    source_url: &str,
377    database: &str,
378    output_path: &str,
379    filter: &ReplicationFilter,
380) -> Result<()> {
381    tracing::info!(
382        "Dumping schema for database '{}' to {}",
383        database,
384        output_path
385    );
386
387    // Parse URL and create .pgpass file for secure authentication
388    let parts = crate::utils::parse_postgres_url(source_url)
389        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
390    let pgpass = crate::utils::PgPassFile::new(&parts)
391        .context("Failed to create .pgpass file for authentication")?;
392
393    let env_vars = parts.to_pg_env_vars();
394    let output_path_owned = output_path.to_string();
395
396    // Collect filter options
397    let exclude_tables = get_schema_excluded_tables_for_db(filter, database);
398    let include_tables = get_included_tables_for_db(filter, database);
399
400    // Wrap subprocess execution with retry logic
401    crate::utils::retry_subprocess_with_backoff(
402        || {
403            let mut cmd = Command::new("pg_dump");
404            cmd.arg("--schema-only")
405                .arg("--no-owner") // Don't include ownership commands
406                .arg("--no-privileges") // We'll handle privileges separately
407                .arg("--verbose"); // Show progress
408
409            // Add table filtering if specified
410            // Only exclude explicit exclude_tables from schema dump (NOT schema_only or predicate tables)
411            if let Some(ref exclude) = exclude_tables {
412                if !exclude.is_empty() {
413                    for table in exclude {
414                        cmd.arg("--exclude-table").arg(table);
415                    }
416                }
417            }
418
419            // If include_tables is specified, only dump those tables
420            if let Some(ref include) = include_tables {
421                if !include.is_empty() {
422                    for table in include {
423                        cmd.arg("--table").arg(table);
424                    }
425                }
426            }
427
428            cmd.arg("--host")
429                .arg(&parts.host)
430                .arg("--port")
431                .arg(parts.port.to_string())
432                .arg("--dbname")
433                .arg(&parts.database)
434                .arg(format!("--file={}", output_path_owned))
435                .env("PGPASSFILE", pgpass.path())
436                .stdout(Stdio::inherit())
437                .stderr(Stdio::inherit());
438
439            // Add username if specified
440            if let Some(user) = &parts.user {
441                cmd.arg("--username").arg(user);
442            }
443
444            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
445            for (env_var, value) in &env_vars {
446                cmd.env(env_var, value);
447            }
448
449            // Apply TCP keepalive parameters to prevent idle connection timeouts
450            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
451                cmd.env(env_var, value);
452            }
453
454            cmd.status().context(
455                "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
456                 Install with:\n\
457                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
458                 - macOS: brew install postgresql\n\
459                 - RHEL/CentOS: sudo yum install postgresql",
460            )
461        },
462        3,                      // Max 3 retries
463        Duration::from_secs(1), // Start with 1 second delay
464        "pg_dump (dump schema)",
465    )
466    .await
467    .with_context(|| {
468        format!(
469            "pg_dump failed to dump schema for database '{}'.\n\
470             \n\
471             Common causes:\n\
472             - Database does not exist\n\
473             - Connection authentication failed\n\
474             - User lacks privileges to read database schema\n\
475             - Network connectivity issues\n\
476             - Connection timeout or network issues",
477            database
478        )
479    })?;
480
481    tracing::info!("✓ Schema dumped successfully");
482    Ok(())
483}
484
485/// Dump data for a specific database using optimized directory format
486///
487/// Uses PostgreSQL directory format dump with:
488/// - Parallel dumps for faster performance
489/// - Maximum compression (level 9)
490/// - Large object (blob) support
491/// - Directory output for efficient parallel restore
492///
493/// The number of parallel jobs is automatically determined based on available CPU cores.
494pub async fn dump_data(
495    source_url: &str,
496    database: &str,
497    output_path: &str,
498    filter: &ReplicationFilter,
499) -> Result<()> {
500    // Determine optimal number of parallel jobs (number of CPUs, capped at 8)
501    let num_cpus = std::thread::available_parallelism()
502        .map(|n| n.get().min(8))
503        .unwrap_or(4);
504
505    tracing::info!(
506        "Dumping data for database '{}' to {} (parallel={}, compression=9, format=directory)",
507        database,
508        output_path,
509        num_cpus
510    );
511
512    // Parse URL and create .pgpass file for secure authentication
513    let parts = crate::utils::parse_postgres_url(source_url)
514        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
515    let pgpass = crate::utils::PgPassFile::new(&parts)
516        .context("Failed to create .pgpass file for authentication")?;
517
518    let env_vars = parts.to_pg_env_vars();
519    let output_path_owned = output_path.to_string();
520
521    // Collect filter options
522    let exclude_tables = get_data_excluded_tables_for_db(filter, database);
523    let include_tables = get_included_tables_for_db(filter, database);
524
525    // Wrap subprocess execution with retry logic
526    crate::utils::retry_subprocess_with_backoff(
527        || {
528            let mut cmd = Command::new("pg_dump");
529            cmd.arg("--data-only")
530                .arg("--no-owner")
531                .arg("--format=directory") // Directory format enables parallel operations
532                .arg("--blobs") // Include large objects (blobs)
533                .arg("--compress=9") // Maximum compression for smaller dump size
534                .arg(format!("--jobs={}", num_cpus)) // Parallel dump jobs
535                .arg("--verbose"); // Show progress
536
537            // Add table filtering if specified
538            // Exclude explicit excludes, schema_only tables, and predicate tables from data dump
539            if let Some(ref exclude) = exclude_tables {
540                if !exclude.is_empty() {
541                    for table in exclude {
542                        cmd.arg("--exclude-table-data").arg(table);
543                    }
544                }
545            }
546
547            // If include_tables is specified, only dump data for those tables
548            if let Some(ref include) = include_tables {
549                if !include.is_empty() {
550                    for table in include {
551                        cmd.arg("--table").arg(table);
552                    }
553                }
554            }
555
556            cmd.arg("--host")
557                .arg(&parts.host)
558                .arg("--port")
559                .arg(parts.port.to_string())
560                .arg("--dbname")
561                .arg(&parts.database)
562                .arg(format!("--file={}", output_path_owned))
563                .env("PGPASSFILE", pgpass.path())
564                .stdout(Stdio::inherit())
565                .stderr(Stdio::inherit());
566
567            // Add username if specified
568            if let Some(user) = &parts.user {
569                cmd.arg("--username").arg(user);
570            }
571
572            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
573            for (env_var, value) in &env_vars {
574                cmd.env(env_var, value);
575            }
576
577            // Apply TCP keepalive parameters to prevent idle connection timeouts
578            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
579                cmd.env(env_var, value);
580            }
581
582            cmd.status().context(
583                "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
584                 Install with:\n\
585                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
586                 - macOS: brew install postgresql\n\
587                 - RHEL/CentOS: sudo yum install postgresql",
588            )
589        },
590        3,                      // Max 3 retries
591        Duration::from_secs(1), // Start with 1 second delay
592        "pg_dump (dump data)",
593    )
594    .await
595    .with_context(|| {
596        format!(
597            "pg_dump failed to dump data for database '{}'.\n\
598             \n\
599             Common causes:\n\
600             - Database does not exist\n\
601             - Connection authentication failed\n\
602             - User lacks privileges to read table data\n\
603             - Network connectivity issues\n\
604             - Insufficient disk space for dump directory\n\
605             - Output directory already exists (pg_dump requires non-existent path)\n\
606             - Connection timeout or network issues",
607            database
608        )
609    })?;
610
611    tracing::info!(
612        "✓ Data dumped successfully using {} parallel jobs",
613        num_cpus
614    );
615    Ok(())
616}
617
618/// Extract table names to exclude from SCHEMA dumps (--exclude-table flag)
619/// Only excludes explicit exclude_tables - NOT schema_only or predicate tables
620/// (those need their schema created, just not bulk data copied)
621/// Returns schema-qualified names in format: "schema"."table"
622fn get_schema_excluded_tables_for_db(
623    filter: &ReplicationFilter,
624    db_name: &str,
625) -> Option<Vec<String>> {
626    let mut tables = BTreeSet::new();
627
628    // Handle explicit exclude_tables (format: "database.table")
629    // These tables are completely excluded (no schema, no data)
630    if let Some(explicit) = filter.exclude_tables() {
631        for full_name in explicit {
632            let parts: Vec<&str> = full_name.split('.').collect();
633            if parts.len() == 2 && parts[0] == db_name {
634                // Format as "public"."table" for consistency
635                tables.insert(format!("\"public\".\"{}\"", parts[1]));
636            }
637        }
638    }
639
640    if tables.is_empty() {
641        None
642    } else {
643        Some(tables.into_iter().collect())
644    }
645}
646
647/// Extract table names to exclude from DATA dumps (--exclude-table-data flag)
648/// Excludes explicit excludes, schema_only tables, and predicate tables
649/// (predicate tables will be copied separately with filtering)
650/// Returns schema-qualified names in format: "schema"."table"
651fn get_data_excluded_tables_for_db(
652    filter: &ReplicationFilter,
653    db_name: &str,
654) -> Option<Vec<String>> {
655    let mut tables = BTreeSet::new();
656
657    // Handle explicit exclude_tables (format: "database.table")
658    // Default to public schema for backward compatibility
659    if let Some(explicit) = filter.exclude_tables() {
660        for full_name in explicit {
661            let parts: Vec<&str> = full_name.split('.').collect();
662            if parts.len() == 2 && parts[0] == db_name {
663                // Format as "public"."table" for consistency
664                tables.insert(format!("\"public\".\"{}\"", parts[1]));
665            }
666        }
667    }
668
669    // schema_only_tables and predicate_tables already return schema-qualified names
670    for table in filter.schema_only_tables(db_name) {
671        tables.insert(table);
672    }
673
674    for (table, _) in filter.predicate_tables(db_name) {
675        tables.insert(table);
676    }
677
678    if tables.is_empty() {
679        None
680    } else {
681        Some(tables.into_iter().collect())
682    }
683}
684
685/// Extract table names for a specific database from include_tables filter
686/// Returns schema-qualified names in format: "schema"."table"
687fn get_included_tables_for_db(filter: &ReplicationFilter, db_name: &str) -> Option<Vec<String>> {
688    filter.include_tables().map(|tables| {
689        tables
690            .iter()
691            .filter_map(|full_name| {
692                let parts: Vec<&str> = full_name.split('.').collect();
693                if parts.len() == 2 && parts[0] == db_name {
694                    // Format as "public"."table" for consistency
695                    Some(format!("\"public\".\"{}\"", parts[1]))
696                } else {
697                    None
698                }
699            })
700            .collect()
701    })
702}
703
704#[cfg(test)]
705mod tests {
706    use super::*;
707    use tempfile::tempdir;
708
709    #[tokio::test]
710    #[ignore]
711    async fn test_dump_globals() {
712        let url = std::env::var("TEST_SOURCE_URL").unwrap();
713        let dir = tempdir().unwrap();
714        let output = dir.path().join("globals.sql");
715
716        let result = dump_globals(&url, output.to_str().unwrap()).await;
717
718        assert!(result.is_ok());
719        assert!(output.exists());
720
721        // Verify file contains SQL
722        let content = std::fs::read_to_string(&output).unwrap();
723        assert!(content.contains("CREATE ROLE") || !content.is_empty());
724    }
725
726    #[tokio::test]
727    #[ignore]
728    async fn test_dump_schema() {
729        let url = std::env::var("TEST_SOURCE_URL").unwrap();
730        let dir = tempdir().unwrap();
731        let output = dir.path().join("schema.sql");
732
733        // Extract database name from URL
734        let db = url.split('/').next_back().unwrap_or("postgres");
735
736        let filter = crate::filters::ReplicationFilter::empty();
737        let result = dump_schema(&url, db, output.to_str().unwrap(), &filter).await;
738
739        assert!(result.is_ok());
740        assert!(output.exists());
741    }
742
743    #[test]
744    fn test_get_schema_excluded_tables_for_db() {
745        let filter = crate::filters::ReplicationFilter::new(
746            None,
747            None,
748            None,
749            Some(vec![
750                "db1.table1".to_string(),
751                "db1.table2".to_string(),
752                "db2.table3".to_string(),
753            ]),
754        )
755        .unwrap();
756
757        // Schema exclusion only includes explicit exclude_tables
758        let tables = get_schema_excluded_tables_for_db(&filter, "db1").unwrap();
759        // Should return schema-qualified names
760        assert_eq!(
761            tables,
762            vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
763        );
764
765        let tables = get_schema_excluded_tables_for_db(&filter, "db2").unwrap();
766        assert_eq!(tables, vec!["\"public\".\"table3\""]);
767
768        let tables = get_schema_excluded_tables_for_db(&filter, "db3");
769        assert!(tables.is_none() || tables.unwrap().is_empty());
770    }
771
772    #[test]
773    fn test_get_data_excluded_tables_for_db() {
774        let filter = crate::filters::ReplicationFilter::new(
775            None,
776            None,
777            None,
778            Some(vec![
779                "db1.table1".to_string(),
780                "db1.table2".to_string(),
781                "db2.table3".to_string(),
782            ]),
783        )
784        .unwrap();
785
786        // Data exclusion includes explicit exclude_tables, schema_only, and predicate tables
787        let tables = get_data_excluded_tables_for_db(&filter, "db1").unwrap();
788        // Should return schema-qualified names
789        assert_eq!(
790            tables,
791            vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
792        );
793
794        let tables = get_data_excluded_tables_for_db(&filter, "db2").unwrap();
795        assert_eq!(tables, vec!["\"public\".\"table3\""]);
796
797        let tables = get_data_excluded_tables_for_db(&filter, "db3");
798        assert!(tables.is_none() || tables.unwrap().is_empty());
799    }
800
801    #[test]
802    fn test_get_included_tables_for_db() {
803        let filter = crate::filters::ReplicationFilter::new(
804            None,
805            None,
806            Some(vec![
807                "db1.users".to_string(),
808                "db1.orders".to_string(),
809                "db2.products".to_string(),
810            ]),
811            None,
812        )
813        .unwrap();
814
815        let tables = get_included_tables_for_db(&filter, "db1").unwrap();
816        // Should return schema-qualified names in original order
817        assert_eq!(
818            tables,
819            vec!["\"public\".\"users\"", "\"public\".\"orders\""]
820        );
821
822        let tables = get_included_tables_for_db(&filter, "db2").unwrap();
823        assert_eq!(tables, vec!["\"public\".\"products\""]);
824
825        let tables = get_included_tables_for_db(&filter, "db3");
826        assert!(tables.is_none() || tables.unwrap().is_empty());
827    }
828
829    #[test]
830    fn test_get_schema_excluded_tables_for_db_with_empty_filter() {
831        let filter = crate::filters::ReplicationFilter::empty();
832        let tables = get_schema_excluded_tables_for_db(&filter, "db1");
833        assert!(tables.is_none());
834    }
835
836    #[test]
837    fn test_get_data_excluded_tables_for_db_with_empty_filter() {
838        let filter = crate::filters::ReplicationFilter::empty();
839        let tables = get_data_excluded_tables_for_db(&filter, "db1");
840        assert!(tables.is_none());
841    }
842
843    #[test]
844    fn test_get_included_tables_for_db_with_empty_filter() {
845        let filter = crate::filters::ReplicationFilter::empty();
846        let tables = get_included_tables_for_db(&filter, "db1");
847        assert!(tables.is_none());
848    }
849
850    #[test]
851    fn test_rewrite_create_role_statements_wraps_unquoted_role() {
852        let sql = "CREATE ROLE replicator WITH LOGIN;\nALTER ROLE replicator WITH LOGIN;\n";
853        let rewritten = rewrite_create_role_statements(sql).expect("rewrite expected");
854
855        assert!(rewritten.contains("DO $$"));
856        assert!(rewritten.contains("Role replicator already exists"));
857        assert!(rewritten.contains("CREATE ROLE replicator WITH LOGIN;"));
858        assert!(rewritten.contains("ALTER ROLE replicator WITH LOGIN;"));
859    }
860
861    #[test]
862    fn test_rewrite_create_role_statements_wraps_quoted_role() {
863        let sql = "    CREATE ROLE \"Andre Admin\";\n";
864        let rewritten = rewrite_create_role_statements(sql).expect("rewrite expected");
865
866        assert!(rewritten.contains("DO $$"));
867        assert!(rewritten.contains("Andre Admin already exists"));
868        assert!(rewritten.contains("CREATE ROLE \"Andre Admin\""));
869        assert!(rewritten.starts_with("    DO $$"));
870    }
871
872    #[test]
873    fn test_rewrite_create_role_statements_noop_when_absent() {
874        let sql = "ALTER ROLE existing WITH LOGIN;\n";
875        assert!(rewrite_create_role_statements(sql).is_none());
876    }
877
878    #[test]
879    fn test_remove_restricted_role_grants() {
880        let dir = tempdir().unwrap();
881        let globals_file = dir.path().join("globals.sql");
882
883        // Write a sample globals dump with restricted role grants
884        let content = r#"CREATE ROLE myuser;
885ALTER ROLE myuser WITH LOGIN;
886GRANT pg_checkpoint TO myuser;
887GRANT "pg_read_all_stats" TO myuser;
888GRANT pg_monitor TO myuser;
889GRANT myrole TO myuser;
890GRANT SELECT ON TABLE users TO myuser;
891"#;
892        std::fs::write(&globals_file, content).unwrap();
893
894        // Run the sanitization
895        remove_restricted_role_grants(globals_file.to_str().unwrap()).unwrap();
896
897        // Verify restricted grants are commented out
898        let result = std::fs::read_to_string(&globals_file).unwrap();
899
900        // Restricted role grants should be commented out
901        assert!(result.contains("-- GRANT pg_checkpoint TO myuser;"));
902        assert!(result.contains("-- GRANT \"pg_read_all_stats\" TO myuser;"));
903        assert!(result.contains("-- GRANT pg_monitor TO myuser;"));
904
905        // Non-restricted grants should remain
906        assert!(result.contains("\nGRANT myrole TO myuser;\n"));
907        assert!(result.contains("\nGRANT SELECT ON TABLE users TO myuser;\n"));
908
909        // Other statements should remain unchanged
910        assert!(result.contains("CREATE ROLE myuser;"));
911        assert!(result.contains("ALTER ROLE myuser WITH LOGIN;"));
912    }
913}