database_replicator/migration/
dump.rs

1// ABOUTME: Wrapper for pg_dump command to export database objects
2// ABOUTME: Handles global objects, schema, and data export
3
4use crate::filters::ReplicationFilter;
5use anyhow::{Context, Result};
6use std::collections::BTreeSet;
7use std::fs;
8use std::process::{Command, Stdio};
9use std::time::Duration;
10
11/// Dump global objects (roles, tablespaces) using pg_dumpall
12pub async fn dump_globals(source_url: &str, output_path: &str) -> Result<()> {
13    tracing::info!("Dumping global objects to {}", output_path);
14
15    // Parse URL and create .pgpass file for secure authentication
16    let parts = crate::utils::parse_postgres_url(source_url)
17        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
18    let pgpass = crate::utils::PgPassFile::new(&parts)
19        .context("Failed to create .pgpass file for authentication")?;
20
21    let env_vars = parts.to_pg_env_vars();
22    let output_path_owned = output_path.to_string();
23
24    // Wrap subprocess execution with retry logic
25    crate::utils::retry_subprocess_with_backoff(
26        || {
27            let mut cmd = Command::new("pg_dumpall");
28            cmd.arg("--globals-only")
29                .arg("--no-role-passwords") // Don't dump passwords
30                .arg("--verbose") // Show progress
31                .arg("--host")
32                .arg(&parts.host)
33                .arg("--port")
34                .arg(parts.port.to_string())
35                .arg("--database")
36                .arg(&parts.database)
37                .arg(format!("--file={}", output_path_owned))
38                .env("PGPASSFILE", pgpass.path())
39                .stdout(Stdio::inherit())
40                .stderr(Stdio::inherit());
41
42            // Add username if specified
43            if let Some(user) = &parts.user {
44                cmd.arg("--username").arg(user);
45            }
46
47            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
48            for (env_var, value) in &env_vars {
49                cmd.env(env_var, value);
50            }
51
52            // Apply TCP keepalive parameters to prevent idle connection timeouts
53            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
54                cmd.env(env_var, value);
55            }
56
57            cmd.status().context(
58                "Failed to execute pg_dumpall. Is PostgreSQL client installed?\n\
59                 Install with:\n\
60                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
61                 - macOS: brew install postgresql\n\
62                 - RHEL/CentOS: sudo yum install postgresql",
63            )
64        },
65        3,                      // Max 3 retries
66        Duration::from_secs(1), // Start with 1 second delay
67        "pg_dumpall (dump globals)",
68    )
69    .await
70    .context(
71        "pg_dumpall failed to dump global objects.\n\
72         \n\
73         Common causes:\n\
74         - Connection authentication failed\n\
75         - User lacks sufficient privileges (need SUPERUSER or pg_read_all_settings role)\n\
76         - Network connectivity issues\n\
77         - Invalid connection string\n\
78         - Connection timeout or network issues",
79    )?;
80
81    tracing::info!("✓ Global objects dumped successfully");
82    Ok(())
83}
84
85/// Update a globals dump so duplicate role creation errors become harmless notices.
86///
87/// `pg_dumpall --globals-only` emits `CREATE ROLE` statements that fail if the
88/// role already exists on the target cluster. When an operator reruns
89/// replication against the same target, those statements cause `psql` to exit
90/// with status 3 which previously triggered noisy retries and prevented the rest
91/// of the globals from being applied. By wrapping each `CREATE ROLE` statement
92/// in a `DO $$ ... EXCEPTION WHEN duplicate_object` block, we allow Postgres to
93/// skip recreating existing roles while still applying subsequent `ALTER ROLE`
94/// and `GRANT` statements.
95pub fn sanitize_globals_dump(path: &str) -> Result<()> {
96    let content = fs::read_to_string(path)
97        .with_context(|| format!("Failed to read globals dump at {}", path))?;
98
99    if let Some(updated) = rewrite_create_role_statements(&content) {
100        fs::write(path, updated)
101            .with_context(|| format!("Failed to update globals dump at {}", path))?;
102    }
103
104    Ok(())
105}
106
107/// Comments out `ALTER ROLE ... SUPERUSER` statements in a globals dump file.
108///
109/// Managed Postgres services (e.g., AWS RDS) often prevent the restore user
110/// from granting SUPERUSER. Commenting out those lines keeps the restore
111/// moving without permission errors.
112pub fn remove_superuser_from_globals(path: &str) -> Result<()> {
113    let content = fs::read_to_string(path)
114        .with_context(|| format!("Failed to read globals dump at {}", path))?;
115
116    let mut updated = String::with_capacity(content.len());
117    let mut modified = false;
118    for line in content.lines() {
119        if line.contains("ALTER ROLE") && line.contains("SUPERUSER") {
120            updated.push_str("-- ");
121            updated.push_str(line);
122            updated.push('\n');
123            modified = true;
124        } else {
125            updated.push_str(line);
126            updated.push('\n');
127        }
128    }
129
130    if modified {
131        fs::write(path, updated)
132            .with_context(|| format!("Failed to write sanitized globals dump to {}", path))?;
133    }
134
135    Ok(())
136}
137
138/// Removes parameter settings that require superuser privileges (e.g. `log_statement`).
139///
140/// AWS RDS prevents standard replication roles from altering certain GUCs via
141/// `ALTER ROLE ... SET`. Each offending line is commented out so `psql` skips
142/// them without aborting the rest of the globals restore.
143pub fn remove_restricted_guc_settings(path: &str) -> Result<()> {
144    let content = fs::read_to_string(path)
145        .with_context(|| format!("Failed to read globals dump at {}", path))?;
146
147    let mut updated = String::with_capacity(content.len());
148    let mut modified = false;
149
150    for line in content.lines() {
151        let lower_line = line.to_ascii_lowercase();
152        if lower_line.contains("alter role") && lower_line.contains("set") {
153            updated.push_str("-- ");
154            updated.push_str(line);
155            updated.push('\n');
156            modified = true;
157        } else {
158            updated.push_str(line);
159            updated.push('\n');
160        }
161    }
162
163    if modified {
164        fs::write(path, updated)
165            .with_context(|| format!("Failed to write sanitized globals dump to {}", path))?;
166    }
167
168    Ok(())
169}
170
171/// Comments out `GRANT` statements for roles that are restricted on managed services.
172///
173/// AWS RDS and other managed services may prevent granting certain default roles
174/// like `pg_checkpoint`. This function also filters out GRANT statements that use
175/// `GRANTED BY` clauses referencing RDS admin roles (e.g., `rdsadmin`).
176pub fn remove_restricted_role_grants(path: &str) -> Result<()> {
177    // Roles that cannot be granted on managed PostgreSQL services (AWS RDS, etc.)
178    const RESTRICTED_ROLES: &[&str] = &[
179        "pg_checkpoint",
180        "pg_read_all_data",
181        "pg_write_all_data",
182        "pg_read_all_settings",
183        "pg_read_all_stats",
184        "pg_stat_scan_tables",
185        "pg_monitor",
186        "pg_signal_backend",
187        "pg_read_server_files",
188        "pg_write_server_files",
189        "pg_execute_server_program",
190        "pg_create_subscription",
191        "pg_maintain",
192        "pg_use_reserved_connections",
193    ];
194
195    // Roles that cannot be used as grantors in GRANTED BY clauses
196    const RESTRICTED_GRANTORS: &[&str] = &[
197        "rdsadmin",
198        "rds_superuser",
199        "rdsrepladmin",
200        "rds_replication",
201    ];
202
203    let content = fs::read_to_string(path)
204        .with_context(|| format!("Failed to read globals dump at {}", path))?;
205
206    let mut updated = String::with_capacity(content.len());
207    let mut modified = false;
208
209    for line in content.lines() {
210        let lower_trimmed = line.trim().to_ascii_lowercase();
211        if lower_trimmed.starts_with("grant ") {
212            // Check if granting a restricted role
213            let is_restricted_role = RESTRICTED_ROLES.iter().any(|role| {
214                // Get the role being granted (2nd word), stripping any quotes
215                // e.g. "grant pg_checkpoint to some_user" or "grant \"pg_checkpoint\" to some_user"
216                lower_trimmed
217                    .split_whitespace()
218                    .nth(1)
219                    .map(|r| r.trim_matches('"') == *role)
220                    .unwrap_or(false)
221            });
222
223            // Check if using a restricted grantor in GRANTED BY clause
224            let has_restricted_grantor = RESTRICTED_GRANTORS.iter().any(|grantor| {
225                // Look for "granted by rdsadmin" or "granted by \"rdsadmin\""
226                lower_trimmed.contains(&format!("granted by {}", grantor))
227                    || lower_trimmed.contains(&format!("granted by \"{}\"", grantor))
228            });
229
230            if is_restricted_role || has_restricted_grantor {
231                updated.push_str("-- ");
232                updated.push_str(line);
233                updated.push('\n');
234                modified = true;
235                continue;
236            }
237        }
238
239        updated.push_str(line);
240        updated.push('\n');
241    }
242
243    if modified {
244        fs::write(path, updated)
245            .with_context(|| format!("Failed to write sanitized globals dump to {}", path))?;
246    }
247
248    Ok(())
249}
250
251fn rewrite_create_role_statements(sql: &str) -> Option<String> {
252    if sql.is_empty() {
253        return None;
254    }
255
256    let mut output = String::with_capacity(sql.len() + 1024);
257    let mut modified = false;
258    let mut cursor = 0;
259
260    while cursor < sql.len() {
261        if let Some(rel_pos) = sql[cursor..].find('\n') {
262            let end = cursor + rel_pos + 1;
263            let chunk = &sql[cursor..end];
264            if let Some(transformed) = wrap_create_role_line(chunk) {
265                output.push_str(&transformed);
266                modified = true;
267            } else {
268                output.push_str(chunk);
269            }
270            cursor = end;
271        } else {
272            let chunk = &sql[cursor..];
273            if let Some(transformed) = wrap_create_role_line(chunk) {
274                output.push_str(&transformed);
275                modified = true;
276            } else {
277                output.push_str(chunk);
278            }
279            break;
280        }
281    }
282
283    if modified {
284        Some(output)
285    } else {
286        None
287    }
288}
289
290fn wrap_create_role_line(chunk: &str) -> Option<String> {
291    let trimmed = chunk.trim_start();
292    if !trimmed.starts_with("CREATE ROLE ") {
293        return None;
294    }
295
296    let statement = trimmed.trim_end();
297    let statement_body = statement.trim_end_matches(';').trim_end();
298    let leading_ws_len = chunk.len() - trimmed.len();
299    let leading_ws = &chunk[..leading_ws_len];
300    let newline = if chunk.ends_with("\r\n") {
301        "\r\n"
302    } else if chunk.ends_with('\n') {
303        "\n"
304    } else {
305        ""
306    };
307
308    let role_token = extract_role_token(statement_body)?;
309
310    let notice_name = escape_single_quotes(&unquote_role_name(&role_token));
311
312    let mut block = String::with_capacity(chunk.len() + 128);
313    block.push_str(leading_ws);
314    block.push_str("DO $$\n");
315    block.push_str(leading_ws);
316    block.push_str("BEGIN\n");
317    block.push_str(leading_ws);
318    block.push_str("    ");
319    block.push_str(statement_body);
320    block.push_str(";\n");
321    block.push_str(leading_ws);
322    block.push_str("EXCEPTION\n");
323    block.push_str(leading_ws);
324    block.push_str("    WHEN duplicate_object THEN\n");
325    block.push_str(leading_ws);
326    block.push_str("        RAISE NOTICE 'Role ");
327    block.push_str(&notice_name);
328    block.push_str(" already exists on target, skipping CREATE ROLE';\n");
329    block.push_str(leading_ws);
330    block.push_str("END $$;");
331
332    if !newline.is_empty() {
333        block.push_str(newline);
334    }
335
336    Some(block)
337}
338
339fn extract_role_token(statement: &str) -> Option<String> {
340    let remainder = statement.strip_prefix("CREATE ROLE")?.trim_start();
341
342    if remainder.starts_with('"') {
343        let mut idx = 1;
344        let bytes = remainder.as_bytes();
345        while idx < bytes.len() {
346            if bytes[idx] == b'"' {
347                if idx + 1 < bytes.len() && bytes[idx + 1] == b'"' {
348                    idx += 2;
349                    continue;
350                } else {
351                    idx += 1;
352                    break;
353                }
354            }
355            idx += 1;
356        }
357        if idx <= remainder.len() {
358            return Some(remainder[..idx].to_string());
359        }
360        None
361    } else {
362        let mut end = remainder.len();
363        for (i, ch) in remainder.char_indices() {
364            if ch.is_whitespace() || ch == ';' {
365                end = i;
366                break;
367            }
368        }
369        if end == 0 {
370            None
371        } else {
372            Some(remainder[..end].to_string())
373        }
374    }
375}
376
377fn unquote_role_name(token: &str) -> String {
378    if token.starts_with('"') && token.ends_with('"') && token.len() >= 2 {
379        let inner = &token[1..token.len() - 1];
380        inner.replace("\"\"", "\"")
381    } else {
382        token.to_string()
383    }
384}
385
386fn escape_single_quotes(value: &str) -> String {
387    value.replace('\'', "''")
388}
389
390/// Dump schema (DDL) for a specific database
391pub async fn dump_schema(
392    source_url: &str,
393    database: &str,
394    output_path: &str,
395    filter: &ReplicationFilter,
396) -> Result<()> {
397    tracing::info!(
398        "Dumping schema for database '{}' to {}",
399        database,
400        output_path
401    );
402
403    // Parse URL and create .pgpass file for secure authentication
404    let parts = crate::utils::parse_postgres_url(source_url)
405        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
406    let pgpass = crate::utils::PgPassFile::new(&parts)
407        .context("Failed to create .pgpass file for authentication")?;
408
409    let env_vars = parts.to_pg_env_vars();
410    let output_path_owned = output_path.to_string();
411
412    // Collect filter options
413    let exclude_tables = get_schema_excluded_tables_for_db(filter, database);
414    let include_tables = get_included_tables_for_db(filter, database);
415
416    // Wrap subprocess execution with retry logic
417    crate::utils::retry_subprocess_with_backoff(
418        || {
419            let mut cmd = Command::new("pg_dump");
420            cmd.arg("--schema-only")
421                .arg("--no-owner") // Don't include ownership commands
422                .arg("--no-privileges") // We'll handle privileges separately
423                .arg("--verbose"); // Show progress
424
425            // Add table filtering if specified
426            // Only exclude explicit exclude_tables from schema dump (NOT schema_only or predicate tables)
427            if let Some(ref exclude) = exclude_tables {
428                if !exclude.is_empty() {
429                    for table in exclude {
430                        cmd.arg("--exclude-table").arg(table);
431                    }
432                }
433            }
434
435            // If include_tables is specified, only dump those tables
436            if let Some(ref include) = include_tables {
437                if !include.is_empty() {
438                    for table in include {
439                        cmd.arg("--table").arg(table);
440                    }
441                }
442            }
443
444            cmd.arg("--host")
445                .arg(&parts.host)
446                .arg("--port")
447                .arg(parts.port.to_string())
448                .arg("--dbname")
449                .arg(&parts.database)
450                .arg(format!("--file={}", output_path_owned))
451                .env("PGPASSFILE", pgpass.path())
452                .stdout(Stdio::inherit())
453                .stderr(Stdio::inherit());
454
455            // Add username if specified
456            if let Some(user) = &parts.user {
457                cmd.arg("--username").arg(user);
458            }
459
460            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
461            for (env_var, value) in &env_vars {
462                cmd.env(env_var, value);
463            }
464
465            // Apply TCP keepalive parameters to prevent idle connection timeouts
466            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
467                cmd.env(env_var, value);
468            }
469
470            cmd.status().context(
471                "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
472                 Install with:\n\
473                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
474                 - macOS: brew install postgresql\n\
475                 - RHEL/CentOS: sudo yum install postgresql",
476            )
477        },
478        3,                      // Max 3 retries
479        Duration::from_secs(1), // Start with 1 second delay
480        "pg_dump (dump schema)",
481    )
482    .await
483    .with_context(|| {
484        format!(
485            "pg_dump failed to dump schema for database '{}'.\n\
486             \n\
487             Common causes:\n\
488             - Database does not exist\n\
489             - Connection authentication failed\n\
490             - User lacks privileges to read database schema\n\
491             - Network connectivity issues\n\
492             - Connection timeout or network issues",
493            database
494        )
495    })?;
496
497    tracing::info!("✓ Schema dumped successfully");
498    Ok(())
499}
500
501/// Dump data for a specific database using optimized directory format
502///
503/// Uses PostgreSQL directory format dump with:
504/// - Parallel dumps for faster performance
505/// - Maximum compression (level 9)
506/// - Large object (blob) support
507/// - Directory output for efficient parallel restore
508///
509/// The number of parallel jobs is automatically determined based on available CPU cores.
510pub async fn dump_data(
511    source_url: &str,
512    database: &str,
513    output_path: &str,
514    filter: &ReplicationFilter,
515) -> Result<()> {
516    // Determine optimal number of parallel jobs (number of CPUs, capped at 8)
517    let num_cpus = std::thread::available_parallelism()
518        .map(|n| n.get().min(8))
519        .unwrap_or(4);
520
521    tracing::info!(
522        "Dumping data for database '{}' to {} (parallel={}, compression=9, format=directory)",
523        database,
524        output_path,
525        num_cpus
526    );
527
528    // Parse URL and create .pgpass file for secure authentication
529    let parts = crate::utils::parse_postgres_url(source_url)
530        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
531    let pgpass = crate::utils::PgPassFile::new(&parts)
532        .context("Failed to create .pgpass file for authentication")?;
533
534    let env_vars = parts.to_pg_env_vars();
535    let output_path_owned = output_path.to_string();
536
537    // Collect filter options
538    let exclude_tables = get_data_excluded_tables_for_db(filter, database);
539    let include_tables = get_included_tables_for_db(filter, database);
540
541    // Wrap subprocess execution with retry logic
542    crate::utils::retry_subprocess_with_backoff(
543        || {
544            let mut cmd = Command::new("pg_dump");
545            cmd.arg("--data-only")
546                .arg("--no-owner")
547                .arg("--format=directory") // Directory format enables parallel operations
548                .arg("--blobs") // Include large objects (blobs)
549                .arg("--compress=9") // Maximum compression for smaller dump size
550                .arg(format!("--jobs={}", num_cpus)) // Parallel dump jobs
551                .arg("--verbose"); // Show progress
552
553            // Add table filtering if specified
554            // Exclude explicit excludes, schema_only tables, and predicate tables from data dump
555            if let Some(ref exclude) = exclude_tables {
556                if !exclude.is_empty() {
557                    for table in exclude {
558                        cmd.arg("--exclude-table-data").arg(table);
559                    }
560                }
561            }
562
563            // If include_tables is specified, only dump data for those tables
564            if let Some(ref include) = include_tables {
565                if !include.is_empty() {
566                    for table in include {
567                        cmd.arg("--table").arg(table);
568                    }
569                }
570            }
571
572            cmd.arg("--host")
573                .arg(&parts.host)
574                .arg("--port")
575                .arg(parts.port.to_string())
576                .arg("--dbname")
577                .arg(&parts.database)
578                .arg(format!("--file={}", output_path_owned))
579                .env("PGPASSFILE", pgpass.path())
580                .stdout(Stdio::inherit())
581                .stderr(Stdio::inherit());
582
583            // Add username if specified
584            if let Some(user) = &parts.user {
585                cmd.arg("--username").arg(user);
586            }
587
588            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
589            for (env_var, value) in &env_vars {
590                cmd.env(env_var, value);
591            }
592
593            // Apply TCP keepalive parameters to prevent idle connection timeouts
594            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
595                cmd.env(env_var, value);
596            }
597
598            cmd.status().context(
599                "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
600                 Install with:\n\
601                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
602                 - macOS: brew install postgresql\n\
603                 - RHEL/CentOS: sudo yum install postgresql",
604            )
605        },
606        3,                      // Max 3 retries
607        Duration::from_secs(1), // Start with 1 second delay
608        "pg_dump (dump data)",
609    )
610    .await
611    .with_context(|| {
612        format!(
613            "pg_dump failed to dump data for database '{}'.\n\
614             \n\
615             Common causes:\n\
616             - Database does not exist\n\
617             - Connection authentication failed\n\
618             - User lacks privileges to read table data\n\
619             - Network connectivity issues\n\
620             - Insufficient disk space for dump directory\n\
621             - Output directory already exists (pg_dump requires non-existent path)\n\
622             - Connection timeout or network issues",
623            database
624        )
625    })?;
626
627    tracing::info!(
628        "✓ Data dumped successfully using {} parallel jobs",
629        num_cpus
630    );
631    Ok(())
632}
633
634/// Extract table names to exclude from SCHEMA dumps (--exclude-table flag)
635/// Only excludes explicit exclude_tables - NOT schema_only or predicate tables
636/// (those need their schema created, just not bulk data copied)
637/// Returns schema-qualified names in format: "schema"."table"
638fn get_schema_excluded_tables_for_db(
639    filter: &ReplicationFilter,
640    db_name: &str,
641) -> Option<Vec<String>> {
642    let mut tables = BTreeSet::new();
643
644    // Handle explicit exclude_tables (format: "database.table")
645    // These tables are completely excluded (no schema, no data)
646    if let Some(explicit) = filter.exclude_tables() {
647        for full_name in explicit {
648            let parts: Vec<&str> = full_name.split('.').collect();
649            if parts.len() == 2 && parts[0] == db_name {
650                // Format as "public"."table" for consistency
651                tables.insert(format!("\"public\".\"{}\"", parts[1]));
652            }
653        }
654    }
655
656    if tables.is_empty() {
657        None
658    } else {
659        Some(tables.into_iter().collect())
660    }
661}
662
663/// Extract table names to exclude from DATA dumps (--exclude-table-data flag)
664/// Excludes explicit excludes, schema_only tables, and predicate tables
665/// (predicate tables will be copied separately with filtering)
666/// Returns schema-qualified names in format: "schema"."table"
667fn get_data_excluded_tables_for_db(
668    filter: &ReplicationFilter,
669    db_name: &str,
670) -> Option<Vec<String>> {
671    let mut tables = BTreeSet::new();
672
673    // Handle explicit exclude_tables (format: "database.table")
674    // Default to public schema for backward compatibility
675    if let Some(explicit) = filter.exclude_tables() {
676        for full_name in explicit {
677            let parts: Vec<&str> = full_name.split('.').collect();
678            if parts.len() == 2 && parts[0] == db_name {
679                // Format as "public"."table" for consistency
680                tables.insert(format!("\"public\".\"{}\"", parts[1]));
681            }
682        }
683    }
684
685    // schema_only_tables and predicate_tables already return schema-qualified names
686    for table in filter.schema_only_tables(db_name) {
687        tables.insert(table);
688    }
689
690    for (table, _) in filter.predicate_tables(db_name) {
691        tables.insert(table);
692    }
693
694    if tables.is_empty() {
695        None
696    } else {
697        Some(tables.into_iter().collect())
698    }
699}
700
701/// Extract table names for a specific database from include_tables filter
702/// Returns schema-qualified names in format: "schema"."table"
703fn get_included_tables_for_db(filter: &ReplicationFilter, db_name: &str) -> Option<Vec<String>> {
704    filter.include_tables().map(|tables| {
705        tables
706            .iter()
707            .filter_map(|full_name| {
708                let parts: Vec<&str> = full_name.split('.').collect();
709                if parts.len() == 2 && parts[0] == db_name {
710                    // Format as "public"."table" for consistency
711                    Some(format!("\"public\".\"{}\"", parts[1]))
712                } else {
713                    None
714                }
715            })
716            .collect()
717    })
718}
719
720#[cfg(test)]
721mod tests {
722    use super::*;
723    use tempfile::tempdir;
724
725    #[tokio::test]
726    #[ignore]
727    async fn test_dump_globals() {
728        let url = std::env::var("TEST_SOURCE_URL").unwrap();
729        let dir = tempdir().unwrap();
730        let output = dir.path().join("globals.sql");
731
732        let result = dump_globals(&url, output.to_str().unwrap()).await;
733
734        assert!(result.is_ok());
735        assert!(output.exists());
736
737        // Verify file contains SQL
738        let content = std::fs::read_to_string(&output).unwrap();
739        assert!(content.contains("CREATE ROLE") || !content.is_empty());
740    }
741
742    #[tokio::test]
743    #[ignore]
744    async fn test_dump_schema() {
745        let url = std::env::var("TEST_SOURCE_URL").unwrap();
746        let dir = tempdir().unwrap();
747        let output = dir.path().join("schema.sql");
748
749        // Extract database name from URL
750        let db = url.split('/').next_back().unwrap_or("postgres");
751
752        let filter = crate::filters::ReplicationFilter::empty();
753        let result = dump_schema(&url, db, output.to_str().unwrap(), &filter).await;
754
755        assert!(result.is_ok());
756        assert!(output.exists());
757    }
758
759    #[test]
760    fn test_get_schema_excluded_tables_for_db() {
761        let filter = crate::filters::ReplicationFilter::new(
762            None,
763            None,
764            None,
765            Some(vec![
766                "db1.table1".to_string(),
767                "db1.table2".to_string(),
768                "db2.table3".to_string(),
769            ]),
770        )
771        .unwrap();
772
773        // Schema exclusion only includes explicit exclude_tables
774        let tables = get_schema_excluded_tables_for_db(&filter, "db1").unwrap();
775        // Should return schema-qualified names
776        assert_eq!(
777            tables,
778            vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
779        );
780
781        let tables = get_schema_excluded_tables_for_db(&filter, "db2").unwrap();
782        assert_eq!(tables, vec!["\"public\".\"table3\""]);
783
784        let tables = get_schema_excluded_tables_for_db(&filter, "db3");
785        assert!(tables.is_none() || tables.unwrap().is_empty());
786    }
787
788    #[test]
789    fn test_get_data_excluded_tables_for_db() {
790        let filter = crate::filters::ReplicationFilter::new(
791            None,
792            None,
793            None,
794            Some(vec![
795                "db1.table1".to_string(),
796                "db1.table2".to_string(),
797                "db2.table3".to_string(),
798            ]),
799        )
800        .unwrap();
801
802        // Data exclusion includes explicit exclude_tables, schema_only, and predicate tables
803        let tables = get_data_excluded_tables_for_db(&filter, "db1").unwrap();
804        // Should return schema-qualified names
805        assert_eq!(
806            tables,
807            vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
808        );
809
810        let tables = get_data_excluded_tables_for_db(&filter, "db2").unwrap();
811        assert_eq!(tables, vec!["\"public\".\"table3\""]);
812
813        let tables = get_data_excluded_tables_for_db(&filter, "db3");
814        assert!(tables.is_none() || tables.unwrap().is_empty());
815    }
816
817    #[test]
818    fn test_get_included_tables_for_db() {
819        let filter = crate::filters::ReplicationFilter::new(
820            None,
821            None,
822            Some(vec![
823                "db1.users".to_string(),
824                "db1.orders".to_string(),
825                "db2.products".to_string(),
826            ]),
827            None,
828        )
829        .unwrap();
830
831        let tables = get_included_tables_for_db(&filter, "db1").unwrap();
832        // Should return schema-qualified names in original order
833        assert_eq!(
834            tables,
835            vec!["\"public\".\"users\"", "\"public\".\"orders\""]
836        );
837
838        let tables = get_included_tables_for_db(&filter, "db2").unwrap();
839        assert_eq!(tables, vec!["\"public\".\"products\""]);
840
841        let tables = get_included_tables_for_db(&filter, "db3");
842        assert!(tables.is_none() || tables.unwrap().is_empty());
843    }
844
845    #[test]
846    fn test_get_schema_excluded_tables_for_db_with_empty_filter() {
847        let filter = crate::filters::ReplicationFilter::empty();
848        let tables = get_schema_excluded_tables_for_db(&filter, "db1");
849        assert!(tables.is_none());
850    }
851
852    #[test]
853    fn test_get_data_excluded_tables_for_db_with_empty_filter() {
854        let filter = crate::filters::ReplicationFilter::empty();
855        let tables = get_data_excluded_tables_for_db(&filter, "db1");
856        assert!(tables.is_none());
857    }
858
859    #[test]
860    fn test_get_included_tables_for_db_with_empty_filter() {
861        let filter = crate::filters::ReplicationFilter::empty();
862        let tables = get_included_tables_for_db(&filter, "db1");
863        assert!(tables.is_none());
864    }
865
866    #[test]
867    fn test_rewrite_create_role_statements_wraps_unquoted_role() {
868        let sql = "CREATE ROLE replicator WITH LOGIN;\nALTER ROLE replicator WITH LOGIN;\n";
869        let rewritten = rewrite_create_role_statements(sql).expect("rewrite expected");
870
871        assert!(rewritten.contains("DO $$"));
872        assert!(rewritten.contains("Role replicator already exists"));
873        assert!(rewritten.contains("CREATE ROLE replicator WITH LOGIN;"));
874        assert!(rewritten.contains("ALTER ROLE replicator WITH LOGIN;"));
875    }
876
877    #[test]
878    fn test_rewrite_create_role_statements_wraps_quoted_role() {
879        let sql = "    CREATE ROLE \"Andre Admin\";\n";
880        let rewritten = rewrite_create_role_statements(sql).expect("rewrite expected");
881
882        assert!(rewritten.contains("DO $$"));
883        assert!(rewritten.contains("Andre Admin already exists"));
884        assert!(rewritten.contains("CREATE ROLE \"Andre Admin\""));
885        assert!(rewritten.starts_with("    DO $$"));
886    }
887
888    #[test]
889    fn test_rewrite_create_role_statements_noop_when_absent() {
890        let sql = "ALTER ROLE existing WITH LOGIN;\n";
891        assert!(rewrite_create_role_statements(sql).is_none());
892    }
893
894    #[test]
895    fn test_remove_restricted_role_grants() {
896        let dir = tempdir().unwrap();
897        let globals_file = dir.path().join("globals.sql");
898
899        // Write a sample globals dump with restricted role grants
900        let content = r#"CREATE ROLE myuser;
901ALTER ROLE myuser WITH LOGIN;
902GRANT pg_checkpoint TO myuser;
903GRANT "pg_read_all_stats" TO myuser;
904GRANT pg_monitor TO myuser;
905GRANT myrole TO myuser;
906GRANT SELECT ON TABLE users TO myuser;
907GRANT rds_superuser TO myuser GRANTED BY rdsadmin;
908GRANT ALL ON SCHEMA public TO myuser GRANTED BY "rdsadmin";
909GRANT SELECT ON TABLE orders TO myuser GRANTED BY postgres;
910"#;
911        std::fs::write(&globals_file, content).unwrap();
912
913        // Run the sanitization
914        remove_restricted_role_grants(globals_file.to_str().unwrap()).unwrap();
915
916        // Verify restricted grants are commented out
917        let result = std::fs::read_to_string(&globals_file).unwrap();
918
919        // Restricted role grants should be commented out
920        assert!(result.contains("-- GRANT pg_checkpoint TO myuser;"));
921        assert!(result.contains("-- GRANT \"pg_read_all_stats\" TO myuser;"));
922        assert!(result.contains("-- GRANT pg_monitor TO myuser;"));
923
924        // GRANTED BY rdsadmin clauses should be commented out
925        assert!(result.contains("-- GRANT rds_superuser TO myuser GRANTED BY rdsadmin;"));
926        assert!(result.contains("-- GRANT ALL ON SCHEMA public TO myuser GRANTED BY \"rdsadmin\";"));
927
928        // Non-restricted grants should remain
929        assert!(result.contains("\nGRANT myrole TO myuser;\n"));
930        assert!(result.contains("\nGRANT SELECT ON TABLE users TO myuser;\n"));
931        assert!(result.contains("\nGRANT SELECT ON TABLE orders TO myuser GRANTED BY postgres;\n"));
932
933        // Other statements should remain unchanged
934        assert!(result.contains("CREATE ROLE myuser;"));
935        assert!(result.contains("ALTER ROLE myuser WITH LOGIN;"));
936    }
937}