database_replicator/migration/
dump.rs

1// ABOUTME: Wrapper for pg_dump command to export database objects
2// ABOUTME: Handles global objects, schema, and data export
3
4use crate::filters::ReplicationFilter;
5use anyhow::{Context, Result};
6use std::collections::BTreeSet;
7use std::fs;
8use std::process::{Command, Stdio};
9use std::time::Duration;
10
11/// Dump global objects (roles, tablespaces) using pg_dumpall
12pub async fn dump_globals(source_url: &str, output_path: &str) -> Result<()> {
13    tracing::info!("Dumping global objects to {}", output_path);
14
15    // Parse URL and create .pgpass file for secure authentication
16    let parts = crate::utils::parse_postgres_url(source_url)
17        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
18    let pgpass = crate::utils::PgPassFile::new(&parts)
19        .context("Failed to create .pgpass file for authentication")?;
20
21    let env_vars = parts.to_pg_env_vars();
22    let output_path_owned = output_path.to_string();
23
24    // Wrap subprocess execution with retry logic
25    crate::utils::retry_subprocess_with_backoff(
26        || {
27            let mut cmd = Command::new("pg_dumpall");
28            cmd.arg("--globals-only")
29                .arg("--no-role-passwords") // Don't dump passwords
30                .arg("--verbose") // Show progress
31                .arg("--host")
32                .arg(&parts.host)
33                .arg("--port")
34                .arg(parts.port.to_string())
35                .arg("--database")
36                .arg(&parts.database)
37                .arg(format!("--file={}", output_path_owned))
38                .env("PGPASSFILE", pgpass.path())
39                .stdout(Stdio::inherit())
40                .stderr(Stdio::inherit());
41
42            // Add username if specified
43            if let Some(user) = &parts.user {
44                cmd.arg("--username").arg(user);
45            }
46
47            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
48            for (env_var, value) in &env_vars {
49                cmd.env(env_var, value);
50            }
51
52            // Apply TCP keepalive parameters to prevent idle connection timeouts
53            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
54                cmd.env(env_var, value);
55            }
56
57            cmd.status().context(
58                "Failed to execute pg_dumpall. Is PostgreSQL client installed?\n\
59                 Install with:\n\
60                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
61                 - macOS: brew install postgresql\n\
62                 - RHEL/CentOS: sudo yum install postgresql",
63            )
64        },
65        3,                      // Max 3 retries
66        Duration::from_secs(1), // Start with 1 second delay
67        "pg_dumpall (dump globals)",
68    )
69    .await
70    .context(
71        "pg_dumpall failed to dump global objects.\n\
72         \n\
73         Common causes:\n\
74         - Connection authentication failed\n\
75         - User lacks sufficient privileges (need SUPERUSER or pg_read_all_settings role)\n\
76         - Network connectivity issues\n\
77         - Invalid connection string\n\
78         - Connection timeout or network issues",
79    )?;
80
81    tracing::info!("✓ Global objects dumped successfully");
82    Ok(())
83}
84
85/// Update a globals dump so duplicate role creation errors become harmless notices.
86///
87/// `pg_dumpall --globals-only` emits `CREATE ROLE` statements that fail if the
88/// role already exists on the target cluster. When an operator reruns
89/// replication against the same target, those statements cause `psql` to exit
90/// with status 3 which previously triggered noisy retries and prevented the rest
91/// of the globals from being applied. By wrapping each `CREATE ROLE` statement
92/// in a `DO $$ ... EXCEPTION WHEN duplicate_object` block, we allow Postgres to
93/// skip recreating existing roles while still applying subsequent `ALTER ROLE`
94/// and `GRANT` statements.
95pub fn sanitize_globals_dump(path: &str) -> Result<()> {
96    let content = fs::read_to_string(path)
97        .with_context(|| format!("Failed to read globals dump at {}", path))?;
98
99    if let Some(updated) = rewrite_create_role_statements(&content) {
100        fs::write(path, updated)
101            .with_context(|| format!("Failed to update globals dump at {}", path))?;
102    }
103
104    Ok(())
105}
106
107/// Comments out `ALTER ROLE ... SUPERUSER` statements in a globals dump file.
108///
109/// Managed Postgres services (e.g., AWS RDS) often prevent the restore user
110/// from granting SUPERUSER. Commenting out those lines keeps the restore
111/// moving without permission errors.
112pub fn remove_superuser_from_globals(path: &str) -> Result<()> {
113    let content = fs::read_to_string(path)
114        .with_context(|| format!("Failed to read globals dump at {}", path))?;
115
116    let mut updated = String::with_capacity(content.len());
117    let mut modified = false;
118    for line in content.lines() {
119        if line.contains("ALTER ROLE") && line.contains("SUPERUSER") {
120            updated.push_str("-- ");
121            updated.push_str(line);
122            updated.push('\n');
123            modified = true;
124        } else {
125            updated.push_str(line);
126            updated.push('\n');
127        }
128    }
129
130    if modified {
131        fs::write(path, updated)
132            .with_context(|| format!("Failed to write sanitized globals dump to {}", path))?;
133    }
134
135    Ok(())
136}
137
138/// Removes parameter settings that require superuser privileges (e.g. `log_statement`).
139///
140/// AWS RDS prevents standard replication roles from altering certain GUCs via
141/// `ALTER ROLE ... SET`. Each offending line is commented out so `psql` skips
142/// them without aborting the rest of the globals restore.
143pub fn remove_restricted_guc_settings(path: &str) -> Result<()> {
144    let content = fs::read_to_string(path)
145        .with_context(|| format!("Failed to read globals dump at {}", path))?;
146
147    let mut updated = String::with_capacity(content.len());
148    let mut modified = false;
149
150    for line in content.lines() {
151        let lower_line = line.to_ascii_lowercase();
152        if lower_line.contains("alter role") && lower_line.contains("set") {
153            updated.push_str("-- ");
154            updated.push_str(line);
155            updated.push('\n');
156            modified = true;
157        } else {
158            updated.push_str(line);
159            updated.push('\n');
160        }
161    }
162
163    if modified {
164        fs::write(path, updated)
165            .with_context(|| format!("Failed to write sanitized globals dump to {}", path))?;
166    }
167
168    Ok(())
169}
170
171fn rewrite_create_role_statements(sql: &str) -> Option<String> {
172    if sql.is_empty() {
173        return None;
174    }
175
176    let mut output = String::with_capacity(sql.len() + 1024);
177    let mut modified = false;
178    let mut cursor = 0;
179
180    while cursor < sql.len() {
181        if let Some(rel_pos) = sql[cursor..].find('\n') {
182            let end = cursor + rel_pos + 1;
183            let chunk = &sql[cursor..end];
184            if let Some(transformed) = wrap_create_role_line(chunk) {
185                output.push_str(&transformed);
186                modified = true;
187            } else {
188                output.push_str(chunk);
189            }
190            cursor = end;
191        } else {
192            let chunk = &sql[cursor..];
193            if let Some(transformed) = wrap_create_role_line(chunk) {
194                output.push_str(&transformed);
195                modified = true;
196            } else {
197                output.push_str(chunk);
198            }
199            break;
200        }
201    }
202
203    if modified {
204        Some(output)
205    } else {
206        None
207    }
208}
209
210fn wrap_create_role_line(chunk: &str) -> Option<String> {
211    let trimmed = chunk.trim_start();
212    if !trimmed.starts_with("CREATE ROLE ") {
213        return None;
214    }
215
216    let statement = trimmed.trim_end();
217    let statement_body = statement.trim_end_matches(';').trim_end();
218    let leading_ws_len = chunk.len() - trimmed.len();
219    let leading_ws = &chunk[..leading_ws_len];
220    let newline = if chunk.ends_with("\r\n") {
221        "\r\n"
222    } else if chunk.ends_with('\n') {
223        "\n"
224    } else {
225        ""
226    };
227
228    let role_token = extract_role_token(statement_body)?;
229
230    let notice_name = escape_single_quotes(&unquote_role_name(&role_token));
231
232    let mut block = String::with_capacity(chunk.len() + 128);
233    block.push_str(leading_ws);
234    block.push_str("DO $$\n");
235    block.push_str(leading_ws);
236    block.push_str("BEGIN\n");
237    block.push_str(leading_ws);
238    block.push_str("    ");
239    block.push_str(statement_body);
240    block.push_str(";\n");
241    block.push_str(leading_ws);
242    block.push_str("EXCEPTION\n");
243    block.push_str(leading_ws);
244    block.push_str("    WHEN duplicate_object THEN\n");
245    block.push_str(leading_ws);
246    block.push_str("        RAISE NOTICE 'Role ");
247    block.push_str(&notice_name);
248    block.push_str(" already exists on target, skipping CREATE ROLE';\n");
249    block.push_str(leading_ws);
250    block.push_str("END $$;");
251
252    if !newline.is_empty() {
253        block.push_str(newline);
254    }
255
256    Some(block)
257}
258
259fn extract_role_token(statement: &str) -> Option<String> {
260    let remainder = statement.strip_prefix("CREATE ROLE")?.trim_start();
261
262    if remainder.starts_with('"') {
263        let mut idx = 1;
264        let bytes = remainder.as_bytes();
265        while idx < bytes.len() {
266            if bytes[idx] == b'"' {
267                if idx + 1 < bytes.len() && bytes[idx + 1] == b'"' {
268                    idx += 2;
269                    continue;
270                } else {
271                    idx += 1;
272                    break;
273                }
274            }
275            idx += 1;
276        }
277        if idx <= remainder.len() {
278            return Some(remainder[..idx].to_string());
279        }
280        None
281    } else {
282        let mut end = remainder.len();
283        for (i, ch) in remainder.char_indices() {
284            if ch.is_whitespace() || ch == ';' {
285                end = i;
286                break;
287            }
288        }
289        if end == 0 {
290            None
291        } else {
292            Some(remainder[..end].to_string())
293        }
294    }
295}
296
297fn unquote_role_name(token: &str) -> String {
298    if token.starts_with('"') && token.ends_with('"') && token.len() >= 2 {
299        let inner = &token[1..token.len() - 1];
300        inner.replace("\"\"", "\"")
301    } else {
302        token.to_string()
303    }
304}
305
306fn escape_single_quotes(value: &str) -> String {
307    value.replace('\'', "''")
308}
309
310/// Dump schema (DDL) for a specific database
311pub async fn dump_schema(
312    source_url: &str,
313    database: &str,
314    output_path: &str,
315    filter: &ReplicationFilter,
316) -> Result<()> {
317    tracing::info!(
318        "Dumping schema for database '{}' to {}",
319        database,
320        output_path
321    );
322
323    // Parse URL and create .pgpass file for secure authentication
324    let parts = crate::utils::parse_postgres_url(source_url)
325        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
326    let pgpass = crate::utils::PgPassFile::new(&parts)
327        .context("Failed to create .pgpass file for authentication")?;
328
329    let env_vars = parts.to_pg_env_vars();
330    let output_path_owned = output_path.to_string();
331
332    // Collect filter options
333    let exclude_tables = get_schema_excluded_tables_for_db(filter, database);
334    let include_tables = get_included_tables_for_db(filter, database);
335
336    // Wrap subprocess execution with retry logic
337    crate::utils::retry_subprocess_with_backoff(
338        || {
339            let mut cmd = Command::new("pg_dump");
340            cmd.arg("--schema-only")
341                .arg("--no-owner") // Don't include ownership commands
342                .arg("--no-privileges") // We'll handle privileges separately
343                .arg("--verbose"); // Show progress
344
345            // Add table filtering if specified
346            // Only exclude explicit exclude_tables from schema dump (NOT schema_only or predicate tables)
347            if let Some(ref exclude) = exclude_tables {
348                if !exclude.is_empty() {
349                    for table in exclude {
350                        cmd.arg("--exclude-table").arg(table);
351                    }
352                }
353            }
354
355            // If include_tables is specified, only dump those tables
356            if let Some(ref include) = include_tables {
357                if !include.is_empty() {
358                    for table in include {
359                        cmd.arg("--table").arg(table);
360                    }
361                }
362            }
363
364            cmd.arg("--host")
365                .arg(&parts.host)
366                .arg("--port")
367                .arg(parts.port.to_string())
368                .arg("--dbname")
369                .arg(&parts.database)
370                .arg(format!("--file={}", output_path_owned))
371                .env("PGPASSFILE", pgpass.path())
372                .stdout(Stdio::inherit())
373                .stderr(Stdio::inherit());
374
375            // Add username if specified
376            if let Some(user) = &parts.user {
377                cmd.arg("--username").arg(user);
378            }
379
380            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
381            for (env_var, value) in &env_vars {
382                cmd.env(env_var, value);
383            }
384
385            // Apply TCP keepalive parameters to prevent idle connection timeouts
386            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
387                cmd.env(env_var, value);
388            }
389
390            cmd.status().context(
391                "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
392                 Install with:\n\
393                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
394                 - macOS: brew install postgresql\n\
395                 - RHEL/CentOS: sudo yum install postgresql",
396            )
397        },
398        3,                      // Max 3 retries
399        Duration::from_secs(1), // Start with 1 second delay
400        "pg_dump (dump schema)",
401    )
402    .await
403    .with_context(|| {
404        format!(
405            "pg_dump failed to dump schema for database '{}'.\n\
406             \n\
407             Common causes:\n\
408             - Database does not exist\n\
409             - Connection authentication failed\n\
410             - User lacks privileges to read database schema\n\
411             - Network connectivity issues\n\
412             - Connection timeout or network issues",
413            database
414        )
415    })?;
416
417    tracing::info!("✓ Schema dumped successfully");
418    Ok(())
419}
420
421/// Dump data for a specific database using optimized directory format
422///
423/// Uses PostgreSQL directory format dump with:
424/// - Parallel dumps for faster performance
425/// - Maximum compression (level 9)
426/// - Large object (blob) support
427/// - Directory output for efficient parallel restore
428///
429/// The number of parallel jobs is automatically determined based on available CPU cores.
430pub async fn dump_data(
431    source_url: &str,
432    database: &str,
433    output_path: &str,
434    filter: &ReplicationFilter,
435) -> Result<()> {
436    // Determine optimal number of parallel jobs (number of CPUs, capped at 8)
437    let num_cpus = std::thread::available_parallelism()
438        .map(|n| n.get().min(8))
439        .unwrap_or(4);
440
441    tracing::info!(
442        "Dumping data for database '{}' to {} (parallel={}, compression=9, format=directory)",
443        database,
444        output_path,
445        num_cpus
446    );
447
448    // Parse URL and create .pgpass file for secure authentication
449    let parts = crate::utils::parse_postgres_url(source_url)
450        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
451    let pgpass = crate::utils::PgPassFile::new(&parts)
452        .context("Failed to create .pgpass file for authentication")?;
453
454    let env_vars = parts.to_pg_env_vars();
455    let output_path_owned = output_path.to_string();
456
457    // Collect filter options
458    let exclude_tables = get_data_excluded_tables_for_db(filter, database);
459    let include_tables = get_included_tables_for_db(filter, database);
460
461    // Wrap subprocess execution with retry logic
462    crate::utils::retry_subprocess_with_backoff(
463        || {
464            let mut cmd = Command::new("pg_dump");
465            cmd.arg("--data-only")
466                .arg("--no-owner")
467                .arg("--format=directory") // Directory format enables parallel operations
468                .arg("--blobs") // Include large objects (blobs)
469                .arg("--compress=9") // Maximum compression for smaller dump size
470                .arg(format!("--jobs={}", num_cpus)) // Parallel dump jobs
471                .arg("--verbose"); // Show progress
472
473            // Add table filtering if specified
474            // Exclude explicit excludes, schema_only tables, and predicate tables from data dump
475            if let Some(ref exclude) = exclude_tables {
476                if !exclude.is_empty() {
477                    for table in exclude {
478                        cmd.arg("--exclude-table-data").arg(table);
479                    }
480                }
481            }
482
483            // If include_tables is specified, only dump data for those tables
484            if let Some(ref include) = include_tables {
485                if !include.is_empty() {
486                    for table in include {
487                        cmd.arg("--table").arg(table);
488                    }
489                }
490            }
491
492            cmd.arg("--host")
493                .arg(&parts.host)
494                .arg("--port")
495                .arg(parts.port.to_string())
496                .arg("--dbname")
497                .arg(&parts.database)
498                .arg(format!("--file={}", output_path_owned))
499                .env("PGPASSFILE", pgpass.path())
500                .stdout(Stdio::inherit())
501                .stderr(Stdio::inherit());
502
503            // Add username if specified
504            if let Some(user) = &parts.user {
505                cmd.arg("--username").arg(user);
506            }
507
508            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
509            for (env_var, value) in &env_vars {
510                cmd.env(env_var, value);
511            }
512
513            // Apply TCP keepalive parameters to prevent idle connection timeouts
514            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
515                cmd.env(env_var, value);
516            }
517
518            cmd.status().context(
519                "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
520                 Install with:\n\
521                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
522                 - macOS: brew install postgresql\n\
523                 - RHEL/CentOS: sudo yum install postgresql",
524            )
525        },
526        3,                      // Max 3 retries
527        Duration::from_secs(1), // Start with 1 second delay
528        "pg_dump (dump data)",
529    )
530    .await
531    .with_context(|| {
532        format!(
533            "pg_dump failed to dump data for database '{}'.\n\
534             \n\
535             Common causes:\n\
536             - Database does not exist\n\
537             - Connection authentication failed\n\
538             - User lacks privileges to read table data\n\
539             - Network connectivity issues\n\
540             - Insufficient disk space for dump directory\n\
541             - Output directory already exists (pg_dump requires non-existent path)\n\
542             - Connection timeout or network issues",
543            database
544        )
545    })?;
546
547    tracing::info!(
548        "✓ Data dumped successfully using {} parallel jobs",
549        num_cpus
550    );
551    Ok(())
552}
553
554/// Extract table names to exclude from SCHEMA dumps (--exclude-table flag)
555/// Only excludes explicit exclude_tables - NOT schema_only or predicate tables
556/// (those need their schema created, just not bulk data copied)
557/// Returns schema-qualified names in format: "schema"."table"
558fn get_schema_excluded_tables_for_db(
559    filter: &ReplicationFilter,
560    db_name: &str,
561) -> Option<Vec<String>> {
562    let mut tables = BTreeSet::new();
563
564    // Handle explicit exclude_tables (format: "database.table")
565    // These tables are completely excluded (no schema, no data)
566    if let Some(explicit) = filter.exclude_tables() {
567        for full_name in explicit {
568            let parts: Vec<&str> = full_name.split('.').collect();
569            if parts.len() == 2 && parts[0] == db_name {
570                // Format as "public"."table" for consistency
571                tables.insert(format!("\"public\".\"{}\"", parts[1]));
572            }
573        }
574    }
575
576    if tables.is_empty() {
577        None
578    } else {
579        Some(tables.into_iter().collect())
580    }
581}
582
583/// Extract table names to exclude from DATA dumps (--exclude-table-data flag)
584/// Excludes explicit excludes, schema_only tables, and predicate tables
585/// (predicate tables will be copied separately with filtering)
586/// Returns schema-qualified names in format: "schema"."table"
587fn get_data_excluded_tables_for_db(
588    filter: &ReplicationFilter,
589    db_name: &str,
590) -> Option<Vec<String>> {
591    let mut tables = BTreeSet::new();
592
593    // Handle explicit exclude_tables (format: "database.table")
594    // Default to public schema for backward compatibility
595    if let Some(explicit) = filter.exclude_tables() {
596        for full_name in explicit {
597            let parts: Vec<&str> = full_name.split('.').collect();
598            if parts.len() == 2 && parts[0] == db_name {
599                // Format as "public"."table" for consistency
600                tables.insert(format!("\"public\".\"{}\"", parts[1]));
601            }
602        }
603    }
604
605    // schema_only_tables and predicate_tables already return schema-qualified names
606    for table in filter.schema_only_tables(db_name) {
607        tables.insert(table);
608    }
609
610    for (table, _) in filter.predicate_tables(db_name) {
611        tables.insert(table);
612    }
613
614    if tables.is_empty() {
615        None
616    } else {
617        Some(tables.into_iter().collect())
618    }
619}
620
621/// Extract table names for a specific database from include_tables filter
622/// Returns schema-qualified names in format: "schema"."table"
623fn get_included_tables_for_db(filter: &ReplicationFilter, db_name: &str) -> Option<Vec<String>> {
624    filter.include_tables().map(|tables| {
625        tables
626            .iter()
627            .filter_map(|full_name| {
628                let parts: Vec<&str> = full_name.split('.').collect();
629                if parts.len() == 2 && parts[0] == db_name {
630                    // Format as "public"."table" for consistency
631                    Some(format!("\"public\".\"{}\"", parts[1]))
632                } else {
633                    None
634                }
635            })
636            .collect()
637    })
638}
639
640#[cfg(test)]
641mod tests {
642    use super::*;
643    use tempfile::tempdir;
644
645    #[tokio::test]
646    #[ignore]
647    async fn test_dump_globals() {
648        let url = std::env::var("TEST_SOURCE_URL").unwrap();
649        let dir = tempdir().unwrap();
650        let output = dir.path().join("globals.sql");
651
652        let result = dump_globals(&url, output.to_str().unwrap()).await;
653
654        assert!(result.is_ok());
655        assert!(output.exists());
656
657        // Verify file contains SQL
658        let content = std::fs::read_to_string(&output).unwrap();
659        assert!(content.contains("CREATE ROLE") || !content.is_empty());
660    }
661
662    #[tokio::test]
663    #[ignore]
664    async fn test_dump_schema() {
665        let url = std::env::var("TEST_SOURCE_URL").unwrap();
666        let dir = tempdir().unwrap();
667        let output = dir.path().join("schema.sql");
668
669        // Extract database name from URL
670        let db = url.split('/').next_back().unwrap_or("postgres");
671
672        let filter = crate::filters::ReplicationFilter::empty();
673        let result = dump_schema(&url, db, output.to_str().unwrap(), &filter).await;
674
675        assert!(result.is_ok());
676        assert!(output.exists());
677    }
678
679    #[test]
680    fn test_get_schema_excluded_tables_for_db() {
681        let filter = crate::filters::ReplicationFilter::new(
682            None,
683            None,
684            None,
685            Some(vec![
686                "db1.table1".to_string(),
687                "db1.table2".to_string(),
688                "db2.table3".to_string(),
689            ]),
690        )
691        .unwrap();
692
693        // Schema exclusion only includes explicit exclude_tables
694        let tables = get_schema_excluded_tables_for_db(&filter, "db1").unwrap();
695        // Should return schema-qualified names
696        assert_eq!(
697            tables,
698            vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
699        );
700
701        let tables = get_schema_excluded_tables_for_db(&filter, "db2").unwrap();
702        assert_eq!(tables, vec!["\"public\".\"table3\""]);
703
704        let tables = get_schema_excluded_tables_for_db(&filter, "db3");
705        assert!(tables.is_none() || tables.unwrap().is_empty());
706    }
707
708    #[test]
709    fn test_get_data_excluded_tables_for_db() {
710        let filter = crate::filters::ReplicationFilter::new(
711            None,
712            None,
713            None,
714            Some(vec![
715                "db1.table1".to_string(),
716                "db1.table2".to_string(),
717                "db2.table3".to_string(),
718            ]),
719        )
720        .unwrap();
721
722        // Data exclusion includes explicit exclude_tables, schema_only, and predicate tables
723        let tables = get_data_excluded_tables_for_db(&filter, "db1").unwrap();
724        // Should return schema-qualified names
725        assert_eq!(
726            tables,
727            vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
728        );
729
730        let tables = get_data_excluded_tables_for_db(&filter, "db2").unwrap();
731        assert_eq!(tables, vec!["\"public\".\"table3\""]);
732
733        let tables = get_data_excluded_tables_for_db(&filter, "db3");
734        assert!(tables.is_none() || tables.unwrap().is_empty());
735    }
736
737    #[test]
738    fn test_get_included_tables_for_db() {
739        let filter = crate::filters::ReplicationFilter::new(
740            None,
741            None,
742            Some(vec![
743                "db1.users".to_string(),
744                "db1.orders".to_string(),
745                "db2.products".to_string(),
746            ]),
747            None,
748        )
749        .unwrap();
750
751        let tables = get_included_tables_for_db(&filter, "db1").unwrap();
752        // Should return schema-qualified names in original order
753        assert_eq!(
754            tables,
755            vec!["\"public\".\"users\"", "\"public\".\"orders\""]
756        );
757
758        let tables = get_included_tables_for_db(&filter, "db2").unwrap();
759        assert_eq!(tables, vec!["\"public\".\"products\""]);
760
761        let tables = get_included_tables_for_db(&filter, "db3");
762        assert!(tables.is_none() || tables.unwrap().is_empty());
763    }
764
765    #[test]
766    fn test_get_schema_excluded_tables_for_db_with_empty_filter() {
767        let filter = crate::filters::ReplicationFilter::empty();
768        let tables = get_schema_excluded_tables_for_db(&filter, "db1");
769        assert!(tables.is_none());
770    }
771
772    #[test]
773    fn test_get_data_excluded_tables_for_db_with_empty_filter() {
774        let filter = crate::filters::ReplicationFilter::empty();
775        let tables = get_data_excluded_tables_for_db(&filter, "db1");
776        assert!(tables.is_none());
777    }
778
779    #[test]
780    fn test_get_included_tables_for_db_with_empty_filter() {
781        let filter = crate::filters::ReplicationFilter::empty();
782        let tables = get_included_tables_for_db(&filter, "db1");
783        assert!(tables.is_none());
784    }
785
786    #[test]
787    fn test_rewrite_create_role_statements_wraps_unquoted_role() {
788        let sql = "CREATE ROLE replicator WITH LOGIN;\nALTER ROLE replicator WITH LOGIN;\n";
789        let rewritten = rewrite_create_role_statements(sql).expect("rewrite expected");
790
791        assert!(rewritten.contains("DO $$"));
792        assert!(rewritten.contains("Role replicator already exists"));
793        assert!(rewritten.contains("CREATE ROLE replicator WITH LOGIN;"));
794        assert!(rewritten.contains("ALTER ROLE replicator WITH LOGIN;"));
795    }
796
797    #[test]
798    fn test_rewrite_create_role_statements_wraps_quoted_role() {
799        let sql = "    CREATE ROLE \"Andre Admin\";\n";
800        let rewritten = rewrite_create_role_statements(sql).expect("rewrite expected");
801
802        assert!(rewritten.contains("DO $$"));
803        assert!(rewritten.contains("Andre Admin already exists"));
804        assert!(rewritten.contains("CREATE ROLE \"Andre Admin\""));
805        assert!(rewritten.starts_with("    DO $$"));
806    }
807
808    #[test]
809    fn test_rewrite_create_role_statements_noop_when_absent() {
810        let sql = "ALTER ROLE existing WITH LOGIN;\n";
811        assert!(rewrite_create_role_statements(sql).is_none());
812    }
813}