database_replicator/migration/
dump.rs

1// ABOUTME: Wrapper for pg_dump command to export database objects
2// ABOUTME: Handles global objects, schema, and data export
3
4use crate::filters::ReplicationFilter;
5use anyhow::{Context, Result};
6use std::collections::BTreeSet;
7use std::process::{Command, Stdio};
8use std::time::Duration;
9
10/// Dump global objects (roles, tablespaces) using pg_dumpall
11pub async fn dump_globals(source_url: &str, output_path: &str) -> Result<()> {
12    tracing::info!("Dumping global objects to {}", output_path);
13
14    // Parse URL and create .pgpass file for secure authentication
15    let parts = crate::utils::parse_postgres_url(source_url)
16        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
17    let pgpass = crate::utils::PgPassFile::new(&parts)
18        .context("Failed to create .pgpass file for authentication")?;
19
20    let env_vars = parts.to_pg_env_vars();
21    let output_path_owned = output_path.to_string();
22
23    // Wrap subprocess execution with retry logic
24    crate::utils::retry_subprocess_with_backoff(
25        || {
26            let mut cmd = Command::new("pg_dumpall");
27            cmd.arg("--globals-only")
28                .arg("--no-role-passwords") // Don't dump passwords
29                .arg("--verbose") // Show progress
30                .arg("--host")
31                .arg(&parts.host)
32                .arg("--port")
33                .arg(parts.port.to_string())
34                .arg("--database")
35                .arg(&parts.database)
36                .arg(format!("--file={}", output_path_owned))
37                .env("PGPASSFILE", pgpass.path())
38                .stdout(Stdio::inherit())
39                .stderr(Stdio::inherit());
40
41            // Add username if specified
42            if let Some(user) = &parts.user {
43                cmd.arg("--username").arg(user);
44            }
45
46            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
47            for (env_var, value) in &env_vars {
48                cmd.env(env_var, value);
49            }
50
51            // Apply TCP keepalive parameters to prevent idle connection timeouts
52            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
53                cmd.env(env_var, value);
54            }
55
56            cmd.status().context(
57                "Failed to execute pg_dumpall. Is PostgreSQL client installed?\n\
58                 Install with:\n\
59                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
60                 - macOS: brew install postgresql\n\
61                 - RHEL/CentOS: sudo yum install postgresql",
62            )
63        },
64        3,                      // Max 3 retries
65        Duration::from_secs(1), // Start with 1 second delay
66        "pg_dumpall (dump globals)",
67    )
68    .context(
69        "pg_dumpall failed to dump global objects.\n\
70         \n\
71         Common causes:\n\
72         - Connection authentication failed\n\
73         - User lacks sufficient privileges (need SUPERUSER or pg_read_all_settings role)\n\
74         - Network connectivity issues\n\
75         - Invalid connection string\n\
76         - Connection timeout or network issues",
77    )?;
78
79    tracing::info!("✓ Global objects dumped successfully");
80    Ok(())
81}
82
83/// Dump schema (DDL) for a specific database
84pub async fn dump_schema(
85    source_url: &str,
86    database: &str,
87    output_path: &str,
88    filter: &ReplicationFilter,
89) -> Result<()> {
90    tracing::info!(
91        "Dumping schema for database '{}' to {}",
92        database,
93        output_path
94    );
95
96    // Parse URL and create .pgpass file for secure authentication
97    let parts = crate::utils::parse_postgres_url(source_url)
98        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
99    let pgpass = crate::utils::PgPassFile::new(&parts)
100        .context("Failed to create .pgpass file for authentication")?;
101
102    let env_vars = parts.to_pg_env_vars();
103    let output_path_owned = output_path.to_string();
104
105    // Collect filter options
106    let exclude_tables = get_schema_excluded_tables_for_db(filter, database);
107    let include_tables = get_included_tables_for_db(filter, database);
108
109    // Wrap subprocess execution with retry logic
110    crate::utils::retry_subprocess_with_backoff(
111        || {
112            let mut cmd = Command::new("pg_dump");
113            cmd.arg("--schema-only")
114                .arg("--no-owner") // Don't include ownership commands
115                .arg("--no-privileges") // We'll handle privileges separately
116                .arg("--verbose"); // Show progress
117
118            // Add table filtering if specified
119            // Only exclude explicit exclude_tables from schema dump (NOT schema_only or predicate tables)
120            if let Some(ref exclude) = exclude_tables {
121                if !exclude.is_empty() {
122                    for table in exclude {
123                        cmd.arg("--exclude-table").arg(table);
124                    }
125                }
126            }
127
128            // If include_tables is specified, only dump those tables
129            if let Some(ref include) = include_tables {
130                if !include.is_empty() {
131                    for table in include {
132                        cmd.arg("--table").arg(table);
133                    }
134                }
135            }
136
137            cmd.arg("--host")
138                .arg(&parts.host)
139                .arg("--port")
140                .arg(parts.port.to_string())
141                .arg("--dbname")
142                .arg(&parts.database)
143                .arg(format!("--file={}", output_path_owned))
144                .env("PGPASSFILE", pgpass.path())
145                .stdout(Stdio::inherit())
146                .stderr(Stdio::inherit());
147
148            // Add username if specified
149            if let Some(user) = &parts.user {
150                cmd.arg("--username").arg(user);
151            }
152
153            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
154            for (env_var, value) in &env_vars {
155                cmd.env(env_var, value);
156            }
157
158            // Apply TCP keepalive parameters to prevent idle connection timeouts
159            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
160                cmd.env(env_var, value);
161            }
162
163            cmd.status().context(
164                "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
165                 Install with:\n\
166                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
167                 - macOS: brew install postgresql\n\
168                 - RHEL/CentOS: sudo yum install postgresql",
169            )
170        },
171        3,                      // Max 3 retries
172        Duration::from_secs(1), // Start with 1 second delay
173        "pg_dump (dump schema)",
174    )
175    .with_context(|| {
176        format!(
177            "pg_dump failed to dump schema for database '{}'.\n\
178             \n\
179             Common causes:\n\
180             - Database does not exist\n\
181             - Connection authentication failed\n\
182             - User lacks privileges to read database schema\n\
183             - Network connectivity issues\n\
184             - Connection timeout or network issues",
185            database
186        )
187    })?;
188
189    tracing::info!("✓ Schema dumped successfully");
190    Ok(())
191}
192
193/// Dump data for a specific database using optimized directory format
194///
195/// Uses PostgreSQL directory format dump with:
196/// - Parallel dumps for faster performance
197/// - Maximum compression (level 9)
198/// - Large object (blob) support
199/// - Directory output for efficient parallel restore
200///
201/// The number of parallel jobs is automatically determined based on available CPU cores.
202pub async fn dump_data(
203    source_url: &str,
204    database: &str,
205    output_path: &str,
206    filter: &ReplicationFilter,
207) -> Result<()> {
208    // Determine optimal number of parallel jobs (number of CPUs, capped at 8)
209    let num_cpus = std::thread::available_parallelism()
210        .map(|n| n.get().min(8))
211        .unwrap_or(4);
212
213    tracing::info!(
214        "Dumping data for database '{}' to {} (parallel={}, compression=9, format=directory)",
215        database,
216        output_path,
217        num_cpus
218    );
219
220    // Parse URL and create .pgpass file for secure authentication
221    let parts = crate::utils::parse_postgres_url(source_url)
222        .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
223    let pgpass = crate::utils::PgPassFile::new(&parts)
224        .context("Failed to create .pgpass file for authentication")?;
225
226    let env_vars = parts.to_pg_env_vars();
227    let output_path_owned = output_path.to_string();
228
229    // Collect filter options
230    let exclude_tables = get_data_excluded_tables_for_db(filter, database);
231    let include_tables = get_included_tables_for_db(filter, database);
232
233    // Wrap subprocess execution with retry logic
234    crate::utils::retry_subprocess_with_backoff(
235        || {
236            let mut cmd = Command::new("pg_dump");
237            cmd.arg("--data-only")
238                .arg("--no-owner")
239                .arg("--format=directory") // Directory format enables parallel operations
240                .arg("--blobs") // Include large objects (blobs)
241                .arg("--compress=9") // Maximum compression for smaller dump size
242                .arg(format!("--jobs={}", num_cpus)) // Parallel dump jobs
243                .arg("--verbose"); // Show progress
244
245            // Add table filtering if specified
246            // Exclude explicit excludes, schema_only tables, and predicate tables from data dump
247            if let Some(ref exclude) = exclude_tables {
248                if !exclude.is_empty() {
249                    for table in exclude {
250                        cmd.arg("--exclude-table-data").arg(table);
251                    }
252                }
253            }
254
255            // If include_tables is specified, only dump data for those tables
256            if let Some(ref include) = include_tables {
257                if !include.is_empty() {
258                    for table in include {
259                        cmd.arg("--table").arg(table);
260                    }
261                }
262            }
263
264            cmd.arg("--host")
265                .arg(&parts.host)
266                .arg("--port")
267                .arg(parts.port.to_string())
268                .arg("--dbname")
269                .arg(&parts.database)
270                .arg(format!("--file={}", output_path_owned))
271                .env("PGPASSFILE", pgpass.path())
272                .stdout(Stdio::inherit())
273                .stderr(Stdio::inherit());
274
275            // Add username if specified
276            if let Some(user) = &parts.user {
277                cmd.arg("--username").arg(user);
278            }
279
280            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
281            for (env_var, value) in &env_vars {
282                cmd.env(env_var, value);
283            }
284
285            // Apply TCP keepalive parameters to prevent idle connection timeouts
286            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
287                cmd.env(env_var, value);
288            }
289
290            cmd.status().context(
291                "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
292                 Install with:\n\
293                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
294                 - macOS: brew install postgresql\n\
295                 - RHEL/CentOS: sudo yum install postgresql",
296            )
297        },
298        3,                      // Max 3 retries
299        Duration::from_secs(1), // Start with 1 second delay
300        "pg_dump (dump data)",
301    )
302    .with_context(|| {
303        format!(
304            "pg_dump failed to dump data for database '{}'.\n\
305             \n\
306             Common causes:\n\
307             - Database does not exist\n\
308             - Connection authentication failed\n\
309             - User lacks privileges to read table data\n\
310             - Network connectivity issues\n\
311             - Insufficient disk space for dump directory\n\
312             - Output directory already exists (pg_dump requires non-existent path)\n\
313             - Connection timeout or network issues",
314            database
315        )
316    })?;
317
318    tracing::info!(
319        "✓ Data dumped successfully using {} parallel jobs",
320        num_cpus
321    );
322    Ok(())
323}
324
325/// Extract table names to exclude from SCHEMA dumps (--exclude-table flag)
326/// Only excludes explicit exclude_tables - NOT schema_only or predicate tables
327/// (those need their schema created, just not bulk data copied)
328/// Returns schema-qualified names in format: "schema"."table"
329fn get_schema_excluded_tables_for_db(
330    filter: &ReplicationFilter,
331    db_name: &str,
332) -> Option<Vec<String>> {
333    let mut tables = BTreeSet::new();
334
335    // Handle explicit exclude_tables (format: "database.table")
336    // These tables are completely excluded (no schema, no data)
337    if let Some(explicit) = filter.exclude_tables() {
338        for full_name in explicit {
339            let parts: Vec<&str> = full_name.split('.').collect();
340            if parts.len() == 2 && parts[0] == db_name {
341                // Format as "public"."table" for consistency
342                tables.insert(format!("\"public\".\"{}\"", parts[1]));
343            }
344        }
345    }
346
347    if tables.is_empty() {
348        None
349    } else {
350        Some(tables.into_iter().collect())
351    }
352}
353
354/// Extract table names to exclude from DATA dumps (--exclude-table-data flag)
355/// Excludes explicit excludes, schema_only tables, and predicate tables
356/// (predicate tables will be copied separately with filtering)
357/// Returns schema-qualified names in format: "schema"."table"
358fn get_data_excluded_tables_for_db(
359    filter: &ReplicationFilter,
360    db_name: &str,
361) -> Option<Vec<String>> {
362    let mut tables = BTreeSet::new();
363
364    // Handle explicit exclude_tables (format: "database.table")
365    // Default to public schema for backward compatibility
366    if let Some(explicit) = filter.exclude_tables() {
367        for full_name in explicit {
368            let parts: Vec<&str> = full_name.split('.').collect();
369            if parts.len() == 2 && parts[0] == db_name {
370                // Format as "public"."table" for consistency
371                tables.insert(format!("\"public\".\"{}\"", parts[1]));
372            }
373        }
374    }
375
376    // schema_only_tables and predicate_tables already return schema-qualified names
377    for table in filter.schema_only_tables(db_name) {
378        tables.insert(table);
379    }
380
381    for (table, _) in filter.predicate_tables(db_name) {
382        tables.insert(table);
383    }
384
385    if tables.is_empty() {
386        None
387    } else {
388        Some(tables.into_iter().collect())
389    }
390}
391
392/// Extract table names for a specific database from include_tables filter
393/// Returns schema-qualified names in format: "schema"."table"
394fn get_included_tables_for_db(filter: &ReplicationFilter, db_name: &str) -> Option<Vec<String>> {
395    filter.include_tables().map(|tables| {
396        tables
397            .iter()
398            .filter_map(|full_name| {
399                let parts: Vec<&str> = full_name.split('.').collect();
400                if parts.len() == 2 && parts[0] == db_name {
401                    // Format as "public"."table" for consistency
402                    Some(format!("\"public\".\"{}\"", parts[1]))
403                } else {
404                    None
405                }
406            })
407            .collect()
408    })
409}
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414    use tempfile::tempdir;
415
416    #[tokio::test]
417    #[ignore]
418    async fn test_dump_globals() {
419        let url = std::env::var("TEST_SOURCE_URL").unwrap();
420        let dir = tempdir().unwrap();
421        let output = dir.path().join("globals.sql");
422
423        let result = dump_globals(&url, output.to_str().unwrap()).await;
424
425        assert!(result.is_ok());
426        assert!(output.exists());
427
428        // Verify file contains SQL
429        let content = std::fs::read_to_string(&output).unwrap();
430        assert!(content.contains("CREATE ROLE") || !content.is_empty());
431    }
432
433    #[tokio::test]
434    #[ignore]
435    async fn test_dump_schema() {
436        let url = std::env::var("TEST_SOURCE_URL").unwrap();
437        let dir = tempdir().unwrap();
438        let output = dir.path().join("schema.sql");
439
440        // Extract database name from URL
441        let db = url.split('/').next_back().unwrap_or("postgres");
442
443        let filter = crate::filters::ReplicationFilter::empty();
444        let result = dump_schema(&url, db, output.to_str().unwrap(), &filter).await;
445
446        assert!(result.is_ok());
447        assert!(output.exists());
448    }
449
450    #[test]
451    fn test_get_schema_excluded_tables_for_db() {
452        let filter = crate::filters::ReplicationFilter::new(
453            None,
454            None,
455            None,
456            Some(vec![
457                "db1.table1".to_string(),
458                "db1.table2".to_string(),
459                "db2.table3".to_string(),
460            ]),
461        )
462        .unwrap();
463
464        // Schema exclusion only includes explicit exclude_tables
465        let tables = get_schema_excluded_tables_for_db(&filter, "db1").unwrap();
466        // Should return schema-qualified names
467        assert_eq!(
468            tables,
469            vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
470        );
471
472        let tables = get_schema_excluded_tables_for_db(&filter, "db2").unwrap();
473        assert_eq!(tables, vec!["\"public\".\"table3\""]);
474
475        let tables = get_schema_excluded_tables_for_db(&filter, "db3");
476        assert!(tables.is_none() || tables.unwrap().is_empty());
477    }
478
479    #[test]
480    fn test_get_data_excluded_tables_for_db() {
481        let filter = crate::filters::ReplicationFilter::new(
482            None,
483            None,
484            None,
485            Some(vec![
486                "db1.table1".to_string(),
487                "db1.table2".to_string(),
488                "db2.table3".to_string(),
489            ]),
490        )
491        .unwrap();
492
493        // Data exclusion includes explicit exclude_tables, schema_only, and predicate tables
494        let tables = get_data_excluded_tables_for_db(&filter, "db1").unwrap();
495        // Should return schema-qualified names
496        assert_eq!(
497            tables,
498            vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
499        );
500
501        let tables = get_data_excluded_tables_for_db(&filter, "db2").unwrap();
502        assert_eq!(tables, vec!["\"public\".\"table3\""]);
503
504        let tables = get_data_excluded_tables_for_db(&filter, "db3");
505        assert!(tables.is_none() || tables.unwrap().is_empty());
506    }
507
508    #[test]
509    fn test_get_included_tables_for_db() {
510        let filter = crate::filters::ReplicationFilter::new(
511            None,
512            None,
513            Some(vec![
514                "db1.users".to_string(),
515                "db1.orders".to_string(),
516                "db2.products".to_string(),
517            ]),
518            None,
519        )
520        .unwrap();
521
522        let tables = get_included_tables_for_db(&filter, "db1").unwrap();
523        // Should return schema-qualified names in original order
524        assert_eq!(
525            tables,
526            vec!["\"public\".\"users\"", "\"public\".\"orders\""]
527        );
528
529        let tables = get_included_tables_for_db(&filter, "db2").unwrap();
530        assert_eq!(tables, vec!["\"public\".\"products\""]);
531
532        let tables = get_included_tables_for_db(&filter, "db3");
533        assert!(tables.is_none() || tables.unwrap().is_empty());
534    }
535
536    #[test]
537    fn test_get_schema_excluded_tables_for_db_with_empty_filter() {
538        let filter = crate::filters::ReplicationFilter::empty();
539        let tables = get_schema_excluded_tables_for_db(&filter, "db1");
540        assert!(tables.is_none());
541    }
542
543    #[test]
544    fn test_get_data_excluded_tables_for_db_with_empty_filter() {
545        let filter = crate::filters::ReplicationFilter::empty();
546        let tables = get_data_excluded_tables_for_db(&filter, "db1");
547        assert!(tables.is_none());
548    }
549
550    #[test]
551    fn test_get_included_tables_for_db_with_empty_filter() {
552        let filter = crate::filters::ReplicationFilter::empty();
553        let tables = get_included_tables_for_db(&filter, "db1");
554        assert!(tables.is_none());
555    }
556}