database_replicator/migration/
dump.rs

1// ABOUTME: Wrapper for pg_dump command to export database objects
2// ABOUTME: Handles global objects, schema, and data export
3
4use crate::filters::ReplicationFilter;
5use anyhow::{Context, Result};
6use std::collections::BTreeSet;
7use std::fs;
8use std::process::{Command, Stdio};
9use std::time::Duration;
10
11/// Dump global objects (roles, tablespaces) using pg_dumpall
12pub async fn dump_globals(source_url: &str, output_path: &str) -> Result<()> {
13    tracing::info!("Dumping global objects to {}", output_path);
14
15    // Parse URL and create .pgpass file for secure authentication
16    let parts = crate::utils::parse_postgres_url(source_url)
17        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
18    let pgpass = crate::utils::PgPassFile::new(&parts)
19        .context("Failed to create .pgpass file for authentication")?;
20
21    let env_vars = parts.to_pg_env_vars();
22    let output_path_owned = output_path.to_string();
23
24    // Wrap subprocess execution with retry logic
25    crate::utils::retry_subprocess_with_backoff(
26        || {
27            let mut cmd = Command::new("pg_dumpall");
28            cmd.arg("--globals-only")
29                .arg("--no-role-passwords") // Don't dump passwords
30                .arg("--verbose") // Show progress
31                .arg("--host")
32                .arg(&parts.host)
33                .arg("--port")
34                .arg(parts.port.to_string())
35                .arg("--database")
36                .arg(&parts.database)
37                .arg(format!("--file={}", output_path_owned))
38                .env("PGPASSFILE", pgpass.path())
39                .stdout(Stdio::inherit())
40                .stderr(Stdio::inherit());
41
42            // Add username if specified
43            if let Some(user) = &parts.user {
44                cmd.arg("--username").arg(user);
45            }
46
47            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
48            for (env_var, value) in &env_vars {
49                cmd.env(env_var, value);
50            }
51
52            // Apply TCP keepalive parameters to prevent idle connection timeouts
53            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
54                cmd.env(env_var, value);
55            }
56
57            cmd.status().context(
58                "Failed to execute pg_dumpall. Is PostgreSQL client installed?\n\
59                 Install with:\n\
60                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
61                 - macOS: brew install postgresql\n\
62                 - RHEL/CentOS: sudo yum install postgresql",
63            )
64        },
65        3,                      // Max 3 retries
66        Duration::from_secs(1), // Start with 1 second delay
67        "pg_dumpall (dump globals)",
68    )
69    .await
70    .context(
71        "pg_dumpall failed to dump global objects.\n\
72         \n\
73         Common causes:\n\
74         - Connection authentication failed\n\
75         - User lacks sufficient privileges (need SUPERUSER or pg_read_all_settings role)\n\
76         - Network connectivity issues\n\
77         - Invalid connection string\n\
78         - Connection timeout or network issues",
79    )?;
80
81    tracing::info!("✓ Global objects dumped successfully");
82    Ok(())
83}
84
85/// Update a globals dump so duplicate role creation errors become harmless notices.
86///
87/// `pg_dumpall --globals-only` emits `CREATE ROLE` statements that fail if the
88/// role already exists on the target cluster. When an operator reruns
89/// replication against the same target, those statements cause `psql` to exit
90/// with status 3 which previously triggered noisy retries and prevented the rest
91/// of the globals from being applied. By wrapping each `CREATE ROLE` statement
92/// in a `DO $$ ... EXCEPTION WHEN duplicate_object` block, we allow Postgres to
93/// skip recreating existing roles while still applying subsequent `ALTER ROLE`
94/// and `GRANT` statements.
95pub fn sanitize_globals_dump(path: &str) -> Result<()> {
96    let content = fs::read_to_string(path)
97        .with_context(|| format!("Failed to read globals dump at {}", path))?;
98
99    if let Some(updated) = rewrite_create_role_statements(&content) {
100        fs::write(path, updated)
101            .with_context(|| format!("Failed to update globals dump at {}", path))?;
102    }
103
104    Ok(())
105}
106
107fn rewrite_create_role_statements(sql: &str) -> Option<String> {
108    if sql.is_empty() {
109        return None;
110    }
111
112    let mut output = String::with_capacity(sql.len() + 1024);
113    let mut modified = false;
114    let mut cursor = 0;
115
116    while cursor < sql.len() {
117        if let Some(rel_pos) = sql[cursor..].find('\n') {
118            let end = cursor + rel_pos + 1;
119            let chunk = &sql[cursor..end];
120            if let Some(transformed) = wrap_create_role_line(chunk) {
121                output.push_str(&transformed);
122                modified = true;
123            } else {
124                output.push_str(chunk);
125            }
126            cursor = end;
127        } else {
128            let chunk = &sql[cursor..];
129            if let Some(transformed) = wrap_create_role_line(chunk) {
130                output.push_str(&transformed);
131                modified = true;
132            } else {
133                output.push_str(chunk);
134            }
135            break;
136        }
137    }
138
139    if modified {
140        Some(output)
141    } else {
142        None
143    }
144}
145
146fn wrap_create_role_line(chunk: &str) -> Option<String> {
147    let trimmed = chunk.trim_start();
148    if !trimmed.starts_with("CREATE ROLE ") {
149        return None;
150    }
151
152    let statement = trimmed.trim_end();
153    let statement_body = statement.trim_end_matches(';').trim_end();
154    let leading_ws_len = chunk.len() - trimmed.len();
155    let leading_ws = &chunk[..leading_ws_len];
156    let newline = if chunk.ends_with("\r\n") {
157        "\r\n"
158    } else if chunk.ends_with('\n') {
159        "\n"
160    } else {
161        ""
162    };
163
164    let role_token = extract_role_token(statement_body)?;
165
166    let notice_name = escape_single_quotes(&unquote_role_name(&role_token));
167
168    let mut block = String::with_capacity(chunk.len() + 128);
169    block.push_str(leading_ws);
170    block.push_str("DO $$\n");
171    block.push_str(leading_ws);
172    block.push_str("BEGIN\n");
173    block.push_str(leading_ws);
174    block.push_str("    ");
175    block.push_str(statement_body);
176    block.push_str(";\n");
177    block.push_str(leading_ws);
178    block.push_str("EXCEPTION\n");
179    block.push_str(leading_ws);
180    block.push_str("    WHEN duplicate_object THEN\n");
181    block.push_str(leading_ws);
182    block.push_str("        RAISE NOTICE 'Role ");
183    block.push_str(&notice_name);
184    block.push_str(" already exists on target, skipping CREATE ROLE';\n");
185    block.push_str(leading_ws);
186    block.push_str("END $$;");
187
188    if !newline.is_empty() {
189        block.push_str(newline);
190    }
191
192    Some(block)
193}
194
195fn extract_role_token(statement: &str) -> Option<String> {
196    let remainder = statement.strip_prefix("CREATE ROLE")?.trim_start();
197
198    if remainder.starts_with('"') {
199        let mut idx = 1;
200        let bytes = remainder.as_bytes();
201        while idx < bytes.len() {
202            if bytes[idx] == b'"' {
203                if idx + 1 < bytes.len() && bytes[idx + 1] == b'"' {
204                    idx += 2;
205                    continue;
206                } else {
207                    idx += 1;
208                    break;
209                }
210            }
211            idx += 1;
212        }
213        if idx <= remainder.len() {
214            return Some(remainder[..idx].to_string());
215        }
216        None
217    } else {
218        let mut end = remainder.len();
219        for (i, ch) in remainder.char_indices() {
220            if ch.is_whitespace() || ch == ';' {
221                end = i;
222                break;
223            }
224        }
225        if end == 0 {
226            None
227        } else {
228            Some(remainder[..end].to_string())
229        }
230    }
231}
232
233fn unquote_role_name(token: &str) -> String {
234    if token.starts_with('"') && token.ends_with('"') && token.len() >= 2 {
235        let inner = &token[1..token.len() - 1];
236        inner.replace("\"\"", "\"")
237    } else {
238        token.to_string()
239    }
240}
241
242fn escape_single_quotes(value: &str) -> String {
243    value.replace('\'', "''")
244}
245
246/// Dump schema (DDL) for a specific database
247pub async fn dump_schema(
248    source_url: &str,
249    database: &str,
250    output_path: &str,
251    filter: &ReplicationFilter,
252) -> Result<()> {
253    tracing::info!(
254        "Dumping schema for database '{}' to {}",
255        database,
256        output_path
257    );
258
259    // Parse URL and create .pgpass file for secure authentication
260    let parts = crate::utils::parse_postgres_url(source_url)
261        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
262    let pgpass = crate::utils::PgPassFile::new(&parts)
263        .context("Failed to create .pgpass file for authentication")?;
264
265    let env_vars = parts.to_pg_env_vars();
266    let output_path_owned = output_path.to_string();
267
268    // Collect filter options
269    let exclude_tables = get_schema_excluded_tables_for_db(filter, database);
270    let include_tables = get_included_tables_for_db(filter, database);
271
272    // Wrap subprocess execution with retry logic
273    crate::utils::retry_subprocess_with_backoff(
274        || {
275            let mut cmd = Command::new("pg_dump");
276            cmd.arg("--schema-only")
277                .arg("--no-owner") // Don't include ownership commands
278                .arg("--no-privileges") // We'll handle privileges separately
279                .arg("--verbose"); // Show progress
280
281            // Add table filtering if specified
282            // Only exclude explicit exclude_tables from schema dump (NOT schema_only or predicate tables)
283            if let Some(ref exclude) = exclude_tables {
284                if !exclude.is_empty() {
285                    for table in exclude {
286                        cmd.arg("--exclude-table").arg(table);
287                    }
288                }
289            }
290
291            // If include_tables is specified, only dump those tables
292            if let Some(ref include) = include_tables {
293                if !include.is_empty() {
294                    for table in include {
295                        cmd.arg("--table").arg(table);
296                    }
297                }
298            }
299
300            cmd.arg("--host")
301                .arg(&parts.host)
302                .arg("--port")
303                .arg(parts.port.to_string())
304                .arg("--dbname")
305                .arg(&parts.database)
306                .arg(format!("--file={}", output_path_owned))
307                .env("PGPASSFILE", pgpass.path())
308                .stdout(Stdio::inherit())
309                .stderr(Stdio::inherit());
310
311            // Add username if specified
312            if let Some(user) = &parts.user {
313                cmd.arg("--username").arg(user);
314            }
315
316            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
317            for (env_var, value) in &env_vars {
318                cmd.env(env_var, value);
319            }
320
321            // Apply TCP keepalive parameters to prevent idle connection timeouts
322            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
323                cmd.env(env_var, value);
324            }
325
326            cmd.status().context(
327                "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
328                 Install with:\n\
329                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
330                 - macOS: brew install postgresql\n\
331                 - RHEL/CentOS: sudo yum install postgresql",
332            )
333        },
334        3,                      // Max 3 retries
335        Duration::from_secs(1), // Start with 1 second delay
336        "pg_dump (dump schema)",
337    )
338    .await
339    .with_context(|| {
340        format!(
341            "pg_dump failed to dump schema for database '{}'.\n\
342             \n\
343             Common causes:\n\
344             - Database does not exist\n\
345             - Connection authentication failed\n\
346             - User lacks privileges to read database schema\n\
347             - Network connectivity issues\n\
348             - Connection timeout or network issues",
349            database
350        )
351    })?;
352
353    tracing::info!("✓ Schema dumped successfully");
354    Ok(())
355}
356
357/// Dump data for a specific database using optimized directory format
358///
359/// Uses PostgreSQL directory format dump with:
360/// - Parallel dumps for faster performance
361/// - Maximum compression (level 9)
362/// - Large object (blob) support
363/// - Directory output for efficient parallel restore
364///
365/// The number of parallel jobs is automatically determined based on available CPU cores.
366pub async fn dump_data(
367    source_url: &str,
368    database: &str,
369    output_path: &str,
370    filter: &ReplicationFilter,
371) -> Result<()> {
372    // Determine optimal number of parallel jobs (number of CPUs, capped at 8)
373    let num_cpus = std::thread::available_parallelism()
374        .map(|n| n.get().min(8))
375        .unwrap_or(4);
376
377    tracing::info!(
378        "Dumping data for database '{}' to {} (parallel={}, compression=9, format=directory)",
379        database,
380        output_path,
381        num_cpus
382    );
383
384    // Parse URL and create .pgpass file for secure authentication
385    let parts = crate::utils::parse_postgres_url(source_url)
386        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
387    let pgpass = crate::utils::PgPassFile::new(&parts)
388        .context("Failed to create .pgpass file for authentication")?;
389
390    let env_vars = parts.to_pg_env_vars();
391    let output_path_owned = output_path.to_string();
392
393    // Collect filter options
394    let exclude_tables = get_data_excluded_tables_for_db(filter, database);
395    let include_tables = get_included_tables_for_db(filter, database);
396
397    // Wrap subprocess execution with retry logic
398    crate::utils::retry_subprocess_with_backoff(
399        || {
400            let mut cmd = Command::new("pg_dump");
401            cmd.arg("--data-only")
402                .arg("--no-owner")
403                .arg("--format=directory") // Directory format enables parallel operations
404                .arg("--blobs") // Include large objects (blobs)
405                .arg("--compress=9") // Maximum compression for smaller dump size
406                .arg(format!("--jobs={}", num_cpus)) // Parallel dump jobs
407                .arg("--verbose"); // Show progress
408
409            // Add table filtering if specified
410            // Exclude explicit excludes, schema_only tables, and predicate tables from data dump
411            if let Some(ref exclude) = exclude_tables {
412                if !exclude.is_empty() {
413                    for table in exclude {
414                        cmd.arg("--exclude-table-data").arg(table);
415                    }
416                }
417            }
418
419            // If include_tables is specified, only dump data for those tables
420            if let Some(ref include) = include_tables {
421                if !include.is_empty() {
422                    for table in include {
423                        cmd.arg("--table").arg(table);
424                    }
425                }
426            }
427
428            cmd.arg("--host")
429                .arg(&parts.host)
430                .arg("--port")
431                .arg(parts.port.to_string())
432                .arg("--dbname")
433                .arg(&parts.database)
434                .arg(format!("--file={}", output_path_owned))
435                .env("PGPASSFILE", pgpass.path())
436                .stdout(Stdio::inherit())
437                .stderr(Stdio::inherit());
438
439            // Add username if specified
440            if let Some(user) = &parts.user {
441                cmd.arg("--username").arg(user);
442            }
443
444            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
445            for (env_var, value) in &env_vars {
446                cmd.env(env_var, value);
447            }
448
449            // Apply TCP keepalive parameters to prevent idle connection timeouts
450            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
451                cmd.env(env_var, value);
452            }
453
454            cmd.status().context(
455                "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
456                 Install with:\n\
457                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
458                 - macOS: brew install postgresql\n\
459                 - RHEL/CentOS: sudo yum install postgresql",
460            )
461        },
462        3,                      // Max 3 retries
463        Duration::from_secs(1), // Start with 1 second delay
464        "pg_dump (dump data)",
465    )
466    .await
467    .with_context(|| {
468        format!(
469            "pg_dump failed to dump data for database '{}'.\n\
470             \n\
471             Common causes:\n\
472             - Database does not exist\n\
473             - Connection authentication failed\n\
474             - User lacks privileges to read table data\n\
475             - Network connectivity issues\n\
476             - Insufficient disk space for dump directory\n\
477             - Output directory already exists (pg_dump requires non-existent path)\n\
478             - Connection timeout or network issues",
479            database
480        )
481    })?;
482
483    tracing::info!(
484        "✓ Data dumped successfully using {} parallel jobs",
485        num_cpus
486    );
487    Ok(())
488}
489
490/// Extract table names to exclude from SCHEMA dumps (--exclude-table flag)
491/// Only excludes explicit exclude_tables - NOT schema_only or predicate tables
492/// (those need their schema created, just not bulk data copied)
493/// Returns schema-qualified names in format: "schema"."table"
494fn get_schema_excluded_tables_for_db(
495    filter: &ReplicationFilter,
496    db_name: &str,
497) -> Option<Vec<String>> {
498    let mut tables = BTreeSet::new();
499
500    // Handle explicit exclude_tables (format: "database.table")
501    // These tables are completely excluded (no schema, no data)
502    if let Some(explicit) = filter.exclude_tables() {
503        for full_name in explicit {
504            let parts: Vec<&str> = full_name.split('.').collect();
505            if parts.len() == 2 && parts[0] == db_name {
506                // Format as "public"."table" for consistency
507                tables.insert(format!("\"public\".\"{}\"", parts[1]));
508            }
509        }
510    }
511
512    if tables.is_empty() {
513        None
514    } else {
515        Some(tables.into_iter().collect())
516    }
517}
518
519/// Extract table names to exclude from DATA dumps (--exclude-table-data flag)
520/// Excludes explicit excludes, schema_only tables, and predicate tables
521/// (predicate tables will be copied separately with filtering)
522/// Returns schema-qualified names in format: "schema"."table"
523fn get_data_excluded_tables_for_db(
524    filter: &ReplicationFilter,
525    db_name: &str,
526) -> Option<Vec<String>> {
527    let mut tables = BTreeSet::new();
528
529    // Handle explicit exclude_tables (format: "database.table")
530    // Default to public schema for backward compatibility
531    if let Some(explicit) = filter.exclude_tables() {
532        for full_name in explicit {
533            let parts: Vec<&str> = full_name.split('.').collect();
534            if parts.len() == 2 && parts[0] == db_name {
535                // Format as "public"."table" for consistency
536                tables.insert(format!("\"public\".\"{}\"", parts[1]));
537            }
538        }
539    }
540
541    // schema_only_tables and predicate_tables already return schema-qualified names
542    for table in filter.schema_only_tables(db_name) {
543        tables.insert(table);
544    }
545
546    for (table, _) in filter.predicate_tables(db_name) {
547        tables.insert(table);
548    }
549
550    if tables.is_empty() {
551        None
552    } else {
553        Some(tables.into_iter().collect())
554    }
555}
556
557/// Extract table names for a specific database from include_tables filter
558/// Returns schema-qualified names in format: "schema"."table"
559fn get_included_tables_for_db(filter: &ReplicationFilter, db_name: &str) -> Option<Vec<String>> {
560    filter.include_tables().map(|tables| {
561        tables
562            .iter()
563            .filter_map(|full_name| {
564                let parts: Vec<&str> = full_name.split('.').collect();
565                if parts.len() == 2 && parts[0] == db_name {
566                    // Format as "public"."table" for consistency
567                    Some(format!("\"public\".\"{}\"", parts[1]))
568                } else {
569                    None
570                }
571            })
572            .collect()
573    })
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579    use tempfile::tempdir;
580
581    #[tokio::test]
582    #[ignore]
583    async fn test_dump_globals() {
584        let url = std::env::var("TEST_SOURCE_URL").unwrap();
585        let dir = tempdir().unwrap();
586        let output = dir.path().join("globals.sql");
587
588        let result = dump_globals(&url, output.to_str().unwrap()).await;
589
590        assert!(result.is_ok());
591        assert!(output.exists());
592
593        // Verify file contains SQL
594        let content = std::fs::read_to_string(&output).unwrap();
595        assert!(content.contains("CREATE ROLE") || !content.is_empty());
596    }
597
598    #[tokio::test]
599    #[ignore]
600    async fn test_dump_schema() {
601        let url = std::env::var("TEST_SOURCE_URL").unwrap();
602        let dir = tempdir().unwrap();
603        let output = dir.path().join("schema.sql");
604
605        // Extract database name from URL
606        let db = url.split('/').next_back().unwrap_or("postgres");
607
608        let filter = crate::filters::ReplicationFilter::empty();
609        let result = dump_schema(&url, db, output.to_str().unwrap(), &filter).await;
610
611        assert!(result.is_ok());
612        assert!(output.exists());
613    }
614
615    #[test]
616    fn test_get_schema_excluded_tables_for_db() {
617        let filter = crate::filters::ReplicationFilter::new(
618            None,
619            None,
620            None,
621            Some(vec![
622                "db1.table1".to_string(),
623                "db1.table2".to_string(),
624                "db2.table3".to_string(),
625            ]),
626        )
627        .unwrap();
628
629        // Schema exclusion only includes explicit exclude_tables
630        let tables = get_schema_excluded_tables_for_db(&filter, "db1").unwrap();
631        // Should return schema-qualified names
632        assert_eq!(
633            tables,
634            vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
635        );
636
637        let tables = get_schema_excluded_tables_for_db(&filter, "db2").unwrap();
638        assert_eq!(tables, vec!["\"public\".\"table3\""]);
639
640        let tables = get_schema_excluded_tables_for_db(&filter, "db3");
641        assert!(tables.is_none() || tables.unwrap().is_empty());
642    }
643
644    #[test]
645    fn test_get_data_excluded_tables_for_db() {
646        let filter = crate::filters::ReplicationFilter::new(
647            None,
648            None,
649            None,
650            Some(vec![
651                "db1.table1".to_string(),
652                "db1.table2".to_string(),
653                "db2.table3".to_string(),
654            ]),
655        )
656        .unwrap();
657
658        // Data exclusion includes explicit exclude_tables, schema_only, and predicate tables
659        let tables = get_data_excluded_tables_for_db(&filter, "db1").unwrap();
660        // Should return schema-qualified names
661        assert_eq!(
662            tables,
663            vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
664        );
665
666        let tables = get_data_excluded_tables_for_db(&filter, "db2").unwrap();
667        assert_eq!(tables, vec!["\"public\".\"table3\""]);
668
669        let tables = get_data_excluded_tables_for_db(&filter, "db3");
670        assert!(tables.is_none() || tables.unwrap().is_empty());
671    }
672
673    #[test]
674    fn test_get_included_tables_for_db() {
675        let filter = crate::filters::ReplicationFilter::new(
676            None,
677            None,
678            Some(vec![
679                "db1.users".to_string(),
680                "db1.orders".to_string(),
681                "db2.products".to_string(),
682            ]),
683            None,
684        )
685        .unwrap();
686
687        let tables = get_included_tables_for_db(&filter, "db1").unwrap();
688        // Should return schema-qualified names in original order
689        assert_eq!(
690            tables,
691            vec!["\"public\".\"users\"", "\"public\".\"orders\""]
692        );
693
694        let tables = get_included_tables_for_db(&filter, "db2").unwrap();
695        assert_eq!(tables, vec!["\"public\".\"products\""]);
696
697        let tables = get_included_tables_for_db(&filter, "db3");
698        assert!(tables.is_none() || tables.unwrap().is_empty());
699    }
700
701    #[test]
702    fn test_get_schema_excluded_tables_for_db_with_empty_filter() {
703        let filter = crate::filters::ReplicationFilter::empty();
704        let tables = get_schema_excluded_tables_for_db(&filter, "db1");
705        assert!(tables.is_none());
706    }
707
708    #[test]
709    fn test_get_data_excluded_tables_for_db_with_empty_filter() {
710        let filter = crate::filters::ReplicationFilter::empty();
711        let tables = get_data_excluded_tables_for_db(&filter, "db1");
712        assert!(tables.is_none());
713    }
714
715    #[test]
716    fn test_get_included_tables_for_db_with_empty_filter() {
717        let filter = crate::filters::ReplicationFilter::empty();
718        let tables = get_included_tables_for_db(&filter, "db1");
719        assert!(tables.is_none());
720    }
721
722    #[test]
723    fn test_rewrite_create_role_statements_wraps_unquoted_role() {
724        let sql = "CREATE ROLE replicator WITH LOGIN;\nALTER ROLE replicator WITH LOGIN;\n";
725        let rewritten = rewrite_create_role_statements(sql).expect("rewrite expected");
726
727        assert!(rewritten.contains("DO $$"));
728        assert!(rewritten.contains("Role replicator already exists"));
729        assert!(rewritten.contains("CREATE ROLE replicator WITH LOGIN;"));
730        assert!(rewritten.contains("ALTER ROLE replicator WITH LOGIN;"));
731    }
732
733    #[test]
734    fn test_rewrite_create_role_statements_wraps_quoted_role() {
735        let sql = "    CREATE ROLE \"Andre Admin\";\n";
736        let rewritten = rewrite_create_role_statements(sql).expect("rewrite expected");
737
738        assert!(rewritten.contains("DO $$"));
739        assert!(rewritten.contains("Andre Admin already exists"));
740        assert!(rewritten.contains("CREATE ROLE \"Andre Admin\""));
741        assert!(rewritten.starts_with("    DO $$"));
742    }
743
744    #[test]
745    fn test_rewrite_create_role_statements_noop_when_absent() {
746        let sql = "ALTER ROLE existing WITH LOGIN;\n";
747        assert!(rewrite_create_role_statements(sql).is_none());
748    }
749}