database_replicator/migration/
dump.rs

1// ABOUTME: Wrapper for pg_dump command to export database objects
2// ABOUTME: Handles global objects, schema, and data export
3
4use crate::filters::ReplicationFilter;
5use anyhow::{Context, Result};
6use std::collections::BTreeSet;
7use std::process::{Command, Stdio};
8use std::time::Duration;
9
10/// Dump global objects (roles, tablespaces) using pg_dumpall
11pub async fn dump_globals(source_url: &str, output_path: &str) -> Result<()> {
12    tracing::info!("Dumping global objects to {}", output_path);
13
14    // Parse URL and create .pgpass file for secure authentication
15    let parts = crate::utils::parse_postgres_url(source_url)
16        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
17    let pgpass = crate::utils::PgPassFile::new(&parts)
18        .context("Failed to create .pgpass file for authentication")?;
19
20    let env_vars = parts.to_pg_env_vars();
21    let output_path_owned = output_path.to_string();
22
23    // Wrap subprocess execution with retry logic
24    crate::utils::retry_subprocess_with_backoff(
25        || {
26            let mut cmd = Command::new("pg_dumpall");
27            cmd.arg("--globals-only")
28                .arg("--no-role-passwords") // Don't dump passwords
29                .arg("--verbose") // Show progress
30                .arg("--host")
31                .arg(&parts.host)
32                .arg("--port")
33                .arg(parts.port.to_string())
34                .arg("--database")
35                .arg(&parts.database)
36                .arg(format!("--file={}", output_path_owned))
37                .env("PGPASSFILE", pgpass.path())
38                .stdout(Stdio::inherit())
39                .stderr(Stdio::inherit());
40
41            // Add username if specified
42            if let Some(user) = &parts.user {
43                cmd.arg("--username").arg(user);
44            }
45
46            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
47            for (env_var, value) in &env_vars {
48                cmd.env(env_var, value);
49            }
50
51            // Apply TCP keepalive parameters to prevent idle connection timeouts
52            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
53                cmd.env(env_var, value);
54            }
55
56            cmd.status().context(
57                "Failed to execute pg_dumpall. Is PostgreSQL client installed?\n\
58                 Install with:\n\
59                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
60                 - macOS: brew install postgresql\n\
61                 - RHEL/CentOS: sudo yum install postgresql",
62            )
63        },
64        3,                      // Max 3 retries
65        Duration::from_secs(1), // Start with 1 second delay
66        "pg_dumpall (dump globals)",
67    )
68    .await
69    .context(
70        "pg_dumpall failed to dump global objects.\n\
71         \n\
72         Common causes:\n\
73         - Connection authentication failed\n\
74         - User lacks sufficient privileges (need SUPERUSER or pg_read_all_settings role)\n\
75         - Network connectivity issues\n\
76         - Invalid connection string\n\
77         - Connection timeout or network issues",
78    )?;
79
80    tracing::info!("✓ Global objects dumped successfully");
81    Ok(())
82}
83
84/// Dump schema (DDL) for a specific database
85pub async fn dump_schema(
86    source_url: &str,
87    database: &str,
88    output_path: &str,
89    filter: &ReplicationFilter,
90) -> Result<()> {
91    tracing::info!(
92        "Dumping schema for database '{}' to {}",
93        database,
94        output_path
95    );
96
97    // Parse URL and create .pgpass file for secure authentication
98    let parts = crate::utils::parse_postgres_url(source_url)
99        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
100    let pgpass = crate::utils::PgPassFile::new(&parts)
101        .context("Failed to create .pgpass file for authentication")?;
102
103    let env_vars = parts.to_pg_env_vars();
104    let output_path_owned = output_path.to_string();
105
106    // Collect filter options
107    let exclude_tables = get_schema_excluded_tables_for_db(filter, database);
108    let include_tables = get_included_tables_for_db(filter, database);
109
110    // Wrap subprocess execution with retry logic
111    crate::utils::retry_subprocess_with_backoff(
112        || {
113            let mut cmd = Command::new("pg_dump");
114            cmd.arg("--schema-only")
115                .arg("--no-owner") // Don't include ownership commands
116                .arg("--no-privileges") // We'll handle privileges separately
117                .arg("--verbose"); // Show progress
118
119            // Add table filtering if specified
120            // Only exclude explicit exclude_tables from schema dump (NOT schema_only or predicate tables)
121            if let Some(ref exclude) = exclude_tables {
122                if !exclude.is_empty() {
123                    for table in exclude {
124                        cmd.arg("--exclude-table").arg(table);
125                    }
126                }
127            }
128
129            // If include_tables is specified, only dump those tables
130            if let Some(ref include) = include_tables {
131                if !include.is_empty() {
132                    for table in include {
133                        cmd.arg("--table").arg(table);
134                    }
135                }
136            }
137
138            cmd.arg("--host")
139                .arg(&parts.host)
140                .arg("--port")
141                .arg(parts.port.to_string())
142                .arg("--dbname")
143                .arg(&parts.database)
144                .arg(format!("--file={}", output_path_owned))
145                .env("PGPASSFILE", pgpass.path())
146                .stdout(Stdio::inherit())
147                .stderr(Stdio::inherit());
148
149            // Add username if specified
150            if let Some(user) = &parts.user {
151                cmd.arg("--username").arg(user);
152            }
153
154            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
155            for (env_var, value) in &env_vars {
156                cmd.env(env_var, value);
157            }
158
159            // Apply TCP keepalive parameters to prevent idle connection timeouts
160            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
161                cmd.env(env_var, value);
162            }
163
164            cmd.status().context(
165                "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
166                 Install with:\n\
167                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
168                 - macOS: brew install postgresql\n\
169                 - RHEL/CentOS: sudo yum install postgresql",
170            )
171        },
172        3,                      // Max 3 retries
173        Duration::from_secs(1), // Start with 1 second delay
174        "pg_dump (dump schema)",
175    )
176    .await
177    .with_context(|| {
178        format!(
179            "pg_dump failed to dump schema for database '{}'.\n\
180             \n\
181             Common causes:\n\
182             - Database does not exist\n\
183             - Connection authentication failed\n\
184             - User lacks privileges to read database schema\n\
185             - Network connectivity issues\n\
186             - Connection timeout or network issues",
187            database
188        )
189    })?;
190
191    tracing::info!("✓ Schema dumped successfully");
192    Ok(())
193}
194
195/// Dump data for a specific database using optimized directory format
196///
197/// Uses PostgreSQL directory format dump with:
198/// - Parallel dumps for faster performance
199/// - Maximum compression (level 9)
200/// - Large object (blob) support
201/// - Directory output for efficient parallel restore
202///
203/// The number of parallel jobs is automatically determined based on available CPU cores.
204pub async fn dump_data(
205    source_url: &str,
206    database: &str,
207    output_path: &str,
208    filter: &ReplicationFilter,
209) -> Result<()> {
210    // Determine optimal number of parallel jobs (number of CPUs, capped at 8)
211    let num_cpus = std::thread::available_parallelism()
212        .map(|n| n.get().min(8))
213        .unwrap_or(4);
214
215    tracing::info!(
216        "Dumping data for database '{}' to {} (parallel={}, compression=9, format=directory)",
217        database,
218        output_path,
219        num_cpus
220    );
221
222    // Parse URL and create .pgpass file for secure authentication
223    let parts = crate::utils::parse_postgres_url(source_url)
224        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
225    let pgpass = crate::utils::PgPassFile::new(&parts)
226        .context("Failed to create .pgpass file for authentication")?;
227
228    let env_vars = parts.to_pg_env_vars();
229    let output_path_owned = output_path.to_string();
230
231    // Collect filter options
232    let exclude_tables = get_data_excluded_tables_for_db(filter, database);
233    let include_tables = get_included_tables_for_db(filter, database);
234
235    // Wrap subprocess execution with retry logic
236    crate::utils::retry_subprocess_with_backoff(
237        || {
238            let mut cmd = Command::new("pg_dump");
239            cmd.arg("--data-only")
240                .arg("--no-owner")
241                .arg("--format=directory") // Directory format enables parallel operations
242                .arg("--blobs") // Include large objects (blobs)
243                .arg("--compress=9") // Maximum compression for smaller dump size
244                .arg(format!("--jobs={}", num_cpus)) // Parallel dump jobs
245                .arg("--verbose"); // Show progress
246
247            // Add table filtering if specified
248            // Exclude explicit excludes, schema_only tables, and predicate tables from data dump
249            if let Some(ref exclude) = exclude_tables {
250                if !exclude.is_empty() {
251                    for table in exclude {
252                        cmd.arg("--exclude-table-data").arg(table);
253                    }
254                }
255            }
256
257            // If include_tables is specified, only dump data for those tables
258            if let Some(ref include) = include_tables {
259                if !include.is_empty() {
260                    for table in include {
261                        cmd.arg("--table").arg(table);
262                    }
263                }
264            }
265
266            cmd.arg("--host")
267                .arg(&parts.host)
268                .arg("--port")
269                .arg(parts.port.to_string())
270                .arg("--dbname")
271                .arg(&parts.database)
272                .arg(format!("--file={}", output_path_owned))
273                .env("PGPASSFILE", pgpass.path())
274                .stdout(Stdio::inherit())
275                .stderr(Stdio::inherit());
276
277            // Add username if specified
278            if let Some(user) = &parts.user {
279                cmd.arg("--username").arg(user);
280            }
281
282            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
283            for (env_var, value) in &env_vars {
284                cmd.env(env_var, value);
285            }
286
287            // Apply TCP keepalive parameters to prevent idle connection timeouts
288            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
289                cmd.env(env_var, value);
290            }
291
292            cmd.status().context(
293                "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
294                 Install with:\n\
295                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
296                 - macOS: brew install postgresql\n\
297                 - RHEL/CentOS: sudo yum install postgresql",
298            )
299        },
300        3,                      // Max 3 retries
301        Duration::from_secs(1), // Start with 1 second delay
302        "pg_dump (dump data)",
303    )
304    .await
305    .with_context(|| {
306        format!(
307            "pg_dump failed to dump data for database '{}'.\n\
308             \n\
309             Common causes:\n\
310             - Database does not exist\n\
311             - Connection authentication failed\n\
312             - User lacks privileges to read table data\n\
313             - Network connectivity issues\n\
314             - Insufficient disk space for dump directory\n\
315             - Output directory already exists (pg_dump requires non-existent path)\n\
316             - Connection timeout or network issues",
317            database
318        )
319    })?;
320
321    tracing::info!(
322        "✓ Data dumped successfully using {} parallel jobs",
323        num_cpus
324    );
325    Ok(())
326}
327
328/// Extract table names to exclude from SCHEMA dumps (--exclude-table flag)
329/// Only excludes explicit exclude_tables - NOT schema_only or predicate tables
330/// (those need their schema created, just not bulk data copied)
331/// Returns schema-qualified names in format: "schema"."table"
332fn get_schema_excluded_tables_for_db(
333    filter: &ReplicationFilter,
334    db_name: &str,
335) -> Option<Vec<String>> {
336    let mut tables = BTreeSet::new();
337
338    // Handle explicit exclude_tables (format: "database.table")
339    // These tables are completely excluded (no schema, no data)
340    if let Some(explicit) = filter.exclude_tables() {
341        for full_name in explicit {
342            let parts: Vec<&str> = full_name.split('.').collect();
343            if parts.len() == 2 && parts[0] == db_name {
344                // Format as "public"."table" for consistency
345                tables.insert(format!("\"public\".\"{}\"", parts[1]));
346            }
347        }
348    }
349
350    if tables.is_empty() {
351        None
352    } else {
353        Some(tables.into_iter().collect())
354    }
355}
356
357/// Extract table names to exclude from DATA dumps (--exclude-table-data flag)
358/// Excludes explicit excludes, schema_only tables, and predicate tables
359/// (predicate tables will be copied separately with filtering)
360/// Returns schema-qualified names in format: "schema"."table"
361fn get_data_excluded_tables_for_db(
362    filter: &ReplicationFilter,
363    db_name: &str,
364) -> Option<Vec<String>> {
365    let mut tables = BTreeSet::new();
366
367    // Handle explicit exclude_tables (format: "database.table")
368    // Default to public schema for backward compatibility
369    if let Some(explicit) = filter.exclude_tables() {
370        for full_name in explicit {
371            let parts: Vec<&str> = full_name.split('.').collect();
372            if parts.len() == 2 && parts[0] == db_name {
373                // Format as "public"."table" for consistency
374                tables.insert(format!("\"public\".\"{}\"", parts[1]));
375            }
376        }
377    }
378
379    // schema_only_tables and predicate_tables already return schema-qualified names
380    for table in filter.schema_only_tables(db_name) {
381        tables.insert(table);
382    }
383
384    for (table, _) in filter.predicate_tables(db_name) {
385        tables.insert(table);
386    }
387
388    if tables.is_empty() {
389        None
390    } else {
391        Some(tables.into_iter().collect())
392    }
393}
394
395/// Extract table names for a specific database from include_tables filter
396/// Returns schema-qualified names in format: "schema"."table"
397fn get_included_tables_for_db(filter: &ReplicationFilter, db_name: &str) -> Option<Vec<String>> {
398    filter.include_tables().map(|tables| {
399        tables
400            .iter()
401            .filter_map(|full_name| {
402                let parts: Vec<&str> = full_name.split('.').collect();
403                if parts.len() == 2 && parts[0] == db_name {
404                    // Format as "public"."table" for consistency
405                    Some(format!("\"public\".\"{}\"", parts[1]))
406                } else {
407                    None
408                }
409            })
410            .collect()
411    })
412}
413
414#[cfg(test)]
415mod tests {
416    use super::*;
417    use tempfile::tempdir;
418
419    #[tokio::test]
420    #[ignore]
421    async fn test_dump_globals() {
422        let url = std::env::var("TEST_SOURCE_URL").unwrap();
423        let dir = tempdir().unwrap();
424        let output = dir.path().join("globals.sql");
425
426        let result = dump_globals(&url, output.to_str().unwrap()).await;
427
428        assert!(result.is_ok());
429        assert!(output.exists());
430
431        // Verify file contains SQL
432        let content = std::fs::read_to_string(&output).unwrap();
433        assert!(content.contains("CREATE ROLE") || !content.is_empty());
434    }
435
436    #[tokio::test]
437    #[ignore]
438    async fn test_dump_schema() {
439        let url = std::env::var("TEST_SOURCE_URL").unwrap();
440        let dir = tempdir().unwrap();
441        let output = dir.path().join("schema.sql");
442
443        // Extract database name from URL
444        let db = url.split('/').next_back().unwrap_or("postgres");
445
446        let filter = crate::filters::ReplicationFilter::empty();
447        let result = dump_schema(&url, db, output.to_str().unwrap(), &filter).await;
448
449        assert!(result.is_ok());
450        assert!(output.exists());
451    }
452
453    #[test]
454    fn test_get_schema_excluded_tables_for_db() {
455        let filter = crate::filters::ReplicationFilter::new(
456            None,
457            None,
458            None,
459            Some(vec![
460                "db1.table1".to_string(),
461                "db1.table2".to_string(),
462                "db2.table3".to_string(),
463            ]),
464        )
465        .unwrap();
466
467        // Schema exclusion only includes explicit exclude_tables
468        let tables = get_schema_excluded_tables_for_db(&filter, "db1").unwrap();
469        // Should return schema-qualified names
470        assert_eq!(
471            tables,
472            vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
473        );
474
475        let tables = get_schema_excluded_tables_for_db(&filter, "db2").unwrap();
476        assert_eq!(tables, vec!["\"public\".\"table3\""]);
477
478        let tables = get_schema_excluded_tables_for_db(&filter, "db3");
479        assert!(tables.is_none() || tables.unwrap().is_empty());
480    }
481
482    #[test]
483    fn test_get_data_excluded_tables_for_db() {
484        let filter = crate::filters::ReplicationFilter::new(
485            None,
486            None,
487            None,
488            Some(vec![
489                "db1.table1".to_string(),
490                "db1.table2".to_string(),
491                "db2.table3".to_string(),
492            ]),
493        )
494        .unwrap();
495
496        // Data exclusion includes explicit exclude_tables, schema_only, and predicate tables
497        let tables = get_data_excluded_tables_for_db(&filter, "db1").unwrap();
498        // Should return schema-qualified names
499        assert_eq!(
500            tables,
501            vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
502        );
503
504        let tables = get_data_excluded_tables_for_db(&filter, "db2").unwrap();
505        assert_eq!(tables, vec!["\"public\".\"table3\""]);
506
507        let tables = get_data_excluded_tables_for_db(&filter, "db3");
508        assert!(tables.is_none() || tables.unwrap().is_empty());
509    }
510
511    #[test]
512    fn test_get_included_tables_for_db() {
513        let filter = crate::filters::ReplicationFilter::new(
514            None,
515            None,
516            Some(vec![
517                "db1.users".to_string(),
518                "db1.orders".to_string(),
519                "db2.products".to_string(),
520            ]),
521            None,
522        )
523        .unwrap();
524
525        let tables = get_included_tables_for_db(&filter, "db1").unwrap();
526        // Should return schema-qualified names in original order
527        assert_eq!(
528            tables,
529            vec!["\"public\".\"users\"", "\"public\".\"orders\""]
530        );
531
532        let tables = get_included_tables_for_db(&filter, "db2").unwrap();
533        assert_eq!(tables, vec!["\"public\".\"products\""]);
534
535        let tables = get_included_tables_for_db(&filter, "db3");
536        assert!(tables.is_none() || tables.unwrap().is_empty());
537    }
538
539    #[test]
540    fn test_get_schema_excluded_tables_for_db_with_empty_filter() {
541        let filter = crate::filters::ReplicationFilter::empty();
542        let tables = get_schema_excluded_tables_for_db(&filter, "db1");
543        assert!(tables.is_none());
544    }
545
546    #[test]
547    fn test_get_data_excluded_tables_for_db_with_empty_filter() {
548        let filter = crate::filters::ReplicationFilter::empty();
549        let tables = get_data_excluded_tables_for_db(&filter, "db1");
550        assert!(tables.is_none());
551    }
552
553    #[test]
554    fn test_get_included_tables_for_db_with_empty_filter() {
555        let filter = crate::filters::ReplicationFilter::empty();
556        let tables = get_included_tables_for_db(&filter, "db1");
557        assert!(tables.is_none());
558    }
559}