database_replicator/migration/
dump.rs1use crate::filters::ReplicationFilter;
5use anyhow::{Context, Result};
6use std::collections::BTreeSet;
7use std::process::{Command, Stdio};
8use std::time::Duration;
9
10pub async fn dump_globals(source_url: &str, output_path: &str) -> Result<()> {
12 tracing::info!("Dumping global objects to {}", output_path);
13
14 let parts = crate::utils::parse_postgres_url(source_url)
16 .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
17 let pgpass = crate::utils::PgPassFile::new(&parts)
18 .context("Failed to create .pgpass file for authentication")?;
19
20 let env_vars = parts.to_pg_env_vars();
21 let output_path_owned = output_path.to_string();
22
23 crate::utils::retry_subprocess_with_backoff(
25 || {
26 let mut cmd = Command::new("pg_dumpall");
27 cmd.arg("--globals-only")
28 .arg("--no-role-passwords") .arg("--verbose") .arg("--host")
31 .arg(&parts.host)
32 .arg("--port")
33 .arg(parts.port.to_string())
34 .arg("--database")
35 .arg(&parts.database)
36 .arg(format!("--file={}", output_path_owned))
37 .env("PGPASSFILE", pgpass.path())
38 .stdout(Stdio::inherit())
39 .stderr(Stdio::inherit());
40
41 if let Some(user) = &parts.user {
43 cmd.arg("--username").arg(user);
44 }
45
46 for (env_var, value) in &env_vars {
48 cmd.env(env_var, value);
49 }
50
51 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
53 cmd.env(env_var, value);
54 }
55
56 cmd.status().context(
57 "Failed to execute pg_dumpall. Is PostgreSQL client installed?\n\
58 Install with:\n\
59 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
60 - macOS: brew install postgresql\n\
61 - RHEL/CentOS: sudo yum install postgresql",
62 )
63 },
64 3, Duration::from_secs(1), "pg_dumpall (dump globals)",
67 )
68 .await
69 .context(
70 "pg_dumpall failed to dump global objects.\n\
71 \n\
72 Common causes:\n\
73 - Connection authentication failed\n\
74 - User lacks sufficient privileges (need SUPERUSER or pg_read_all_settings role)\n\
75 - Network connectivity issues\n\
76 - Invalid connection string\n\
77 - Connection timeout or network issues",
78 )?;
79
80 tracing::info!("✓ Global objects dumped successfully");
81 Ok(())
82}
83
84pub async fn dump_schema(
86 source_url: &str,
87 database: &str,
88 output_path: &str,
89 filter: &ReplicationFilter,
90) -> Result<()> {
91 tracing::info!(
92 "Dumping schema for database '{}' to {}",
93 database,
94 output_path
95 );
96
97 let parts = crate::utils::parse_postgres_url(source_url)
99 .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
100 let pgpass = crate::utils::PgPassFile::new(&parts)
101 .context("Failed to create .pgpass file for authentication")?;
102
103 let env_vars = parts.to_pg_env_vars();
104 let output_path_owned = output_path.to_string();
105
106 let exclude_tables = get_schema_excluded_tables_for_db(filter, database);
108 let include_tables = get_included_tables_for_db(filter, database);
109
110 crate::utils::retry_subprocess_with_backoff(
112 || {
113 let mut cmd = Command::new("pg_dump");
114 cmd.arg("--schema-only")
115 .arg("--no-owner") .arg("--no-privileges") .arg("--verbose"); if let Some(ref exclude) = exclude_tables {
122 if !exclude.is_empty() {
123 for table in exclude {
124 cmd.arg("--exclude-table").arg(table);
125 }
126 }
127 }
128
129 if let Some(ref include) = include_tables {
131 if !include.is_empty() {
132 for table in include {
133 cmd.arg("--table").arg(table);
134 }
135 }
136 }
137
138 cmd.arg("--host")
139 .arg(&parts.host)
140 .arg("--port")
141 .arg(parts.port.to_string())
142 .arg("--dbname")
143 .arg(&parts.database)
144 .arg(format!("--file={}", output_path_owned))
145 .env("PGPASSFILE", pgpass.path())
146 .stdout(Stdio::inherit())
147 .stderr(Stdio::inherit());
148
149 if let Some(user) = &parts.user {
151 cmd.arg("--username").arg(user);
152 }
153
154 for (env_var, value) in &env_vars {
156 cmd.env(env_var, value);
157 }
158
159 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
161 cmd.env(env_var, value);
162 }
163
164 cmd.status().context(
165 "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
166 Install with:\n\
167 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
168 - macOS: brew install postgresql\n\
169 - RHEL/CentOS: sudo yum install postgresql",
170 )
171 },
172 3, Duration::from_secs(1), "pg_dump (dump schema)",
175 )
176 .await
177 .with_context(|| {
178 format!(
179 "pg_dump failed to dump schema for database '{}'.\n\
180 \n\
181 Common causes:\n\
182 - Database does not exist\n\
183 - Connection authentication failed\n\
184 - User lacks privileges to read database schema\n\
185 - Network connectivity issues\n\
186 - Connection timeout or network issues",
187 database
188 )
189 })?;
190
191 tracing::info!("✓ Schema dumped successfully");
192 Ok(())
193}
194
195pub async fn dump_data(
205 source_url: &str,
206 database: &str,
207 output_path: &str,
208 filter: &ReplicationFilter,
209) -> Result<()> {
210 let num_cpus = std::thread::available_parallelism()
212 .map(|n| n.get().min(8))
213 .unwrap_or(4);
214
215 tracing::info!(
216 "Dumping data for database '{}' to {} (parallel={}, compression=9, format=directory)",
217 database,
218 output_path,
219 num_cpus
220 );
221
222 let parts = crate::utils::parse_postgres_url(source_url)
224 .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
225 let pgpass = crate::utils::PgPassFile::new(&parts)
226 .context("Failed to create .pgpass file for authentication")?;
227
228 let env_vars = parts.to_pg_env_vars();
229 let output_path_owned = output_path.to_string();
230
231 let exclude_tables = get_data_excluded_tables_for_db(filter, database);
233 let include_tables = get_included_tables_for_db(filter, database);
234
235 crate::utils::retry_subprocess_with_backoff(
237 || {
238 let mut cmd = Command::new("pg_dump");
239 cmd.arg("--data-only")
240 .arg("--no-owner")
241 .arg("--format=directory") .arg("--blobs") .arg("--compress=9") .arg(format!("--jobs={}", num_cpus)) .arg("--verbose"); if let Some(ref exclude) = exclude_tables {
250 if !exclude.is_empty() {
251 for table in exclude {
252 cmd.arg("--exclude-table-data").arg(table);
253 }
254 }
255 }
256
257 if let Some(ref include) = include_tables {
259 if !include.is_empty() {
260 for table in include {
261 cmd.arg("--table").arg(table);
262 }
263 }
264 }
265
266 cmd.arg("--host")
267 .arg(&parts.host)
268 .arg("--port")
269 .arg(parts.port.to_string())
270 .arg("--dbname")
271 .arg(&parts.database)
272 .arg(format!("--file={}", output_path_owned))
273 .env("PGPASSFILE", pgpass.path())
274 .stdout(Stdio::inherit())
275 .stderr(Stdio::inherit());
276
277 if let Some(user) = &parts.user {
279 cmd.arg("--username").arg(user);
280 }
281
282 for (env_var, value) in &env_vars {
284 cmd.env(env_var, value);
285 }
286
287 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
289 cmd.env(env_var, value);
290 }
291
292 cmd.status().context(
293 "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
294 Install with:\n\
295 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
296 - macOS: brew install postgresql\n\
297 - RHEL/CentOS: sudo yum install postgresql",
298 )
299 },
300 3, Duration::from_secs(1), "pg_dump (dump data)",
303 )
304 .await
305 .with_context(|| {
306 format!(
307 "pg_dump failed to dump data for database '{}'.\n\
308 \n\
309 Common causes:\n\
310 - Database does not exist\n\
311 - Connection authentication failed\n\
312 - User lacks privileges to read table data\n\
313 - Network connectivity issues\n\
314 - Insufficient disk space for dump directory\n\
315 - Output directory already exists (pg_dump requires non-existent path)\n\
316 - Connection timeout or network issues",
317 database
318 )
319 })?;
320
321 tracing::info!(
322 "✓ Data dumped successfully using {} parallel jobs",
323 num_cpus
324 );
325 Ok(())
326}
327
328fn get_schema_excluded_tables_for_db(
333 filter: &ReplicationFilter,
334 db_name: &str,
335) -> Option<Vec<String>> {
336 let mut tables = BTreeSet::new();
337
338 if let Some(explicit) = filter.exclude_tables() {
341 for full_name in explicit {
342 let parts: Vec<&str> = full_name.split('.').collect();
343 if parts.len() == 2 && parts[0] == db_name {
344 tables.insert(format!("\"public\".\"{}\"", parts[1]));
346 }
347 }
348 }
349
350 if tables.is_empty() {
351 None
352 } else {
353 Some(tables.into_iter().collect())
354 }
355}
356
357fn get_data_excluded_tables_for_db(
362 filter: &ReplicationFilter,
363 db_name: &str,
364) -> Option<Vec<String>> {
365 let mut tables = BTreeSet::new();
366
367 if let Some(explicit) = filter.exclude_tables() {
370 for full_name in explicit {
371 let parts: Vec<&str> = full_name.split('.').collect();
372 if parts.len() == 2 && parts[0] == db_name {
373 tables.insert(format!("\"public\".\"{}\"", parts[1]));
375 }
376 }
377 }
378
379 for table in filter.schema_only_tables(db_name) {
381 tables.insert(table);
382 }
383
384 for (table, _) in filter.predicate_tables(db_name) {
385 tables.insert(table);
386 }
387
388 if tables.is_empty() {
389 None
390 } else {
391 Some(tables.into_iter().collect())
392 }
393}
394
395fn get_included_tables_for_db(filter: &ReplicationFilter, db_name: &str) -> Option<Vec<String>> {
398 filter.include_tables().map(|tables| {
399 tables
400 .iter()
401 .filter_map(|full_name| {
402 let parts: Vec<&str> = full_name.split('.').collect();
403 if parts.len() == 2 && parts[0] == db_name {
404 Some(format!("\"public\".\"{}\"", parts[1]))
406 } else {
407 None
408 }
409 })
410 .collect()
411 })
412}
413
414#[cfg(test)]
415mod tests {
416 use super::*;
417 use tempfile::tempdir;
418
419 #[tokio::test]
420 #[ignore]
421 async fn test_dump_globals() {
422 let url = std::env::var("TEST_SOURCE_URL").unwrap();
423 let dir = tempdir().unwrap();
424 let output = dir.path().join("globals.sql");
425
426 let result = dump_globals(&url, output.to_str().unwrap()).await;
427
428 assert!(result.is_ok());
429 assert!(output.exists());
430
431 let content = std::fs::read_to_string(&output).unwrap();
433 assert!(content.contains("CREATE ROLE") || !content.is_empty());
434 }
435
436 #[tokio::test]
437 #[ignore]
438 async fn test_dump_schema() {
439 let url = std::env::var("TEST_SOURCE_URL").unwrap();
440 let dir = tempdir().unwrap();
441 let output = dir.path().join("schema.sql");
442
443 let db = url.split('/').next_back().unwrap_or("postgres");
445
446 let filter = crate::filters::ReplicationFilter::empty();
447 let result = dump_schema(&url, db, output.to_str().unwrap(), &filter).await;
448
449 assert!(result.is_ok());
450 assert!(output.exists());
451 }
452
453 #[test]
454 fn test_get_schema_excluded_tables_for_db() {
455 let filter = crate::filters::ReplicationFilter::new(
456 None,
457 None,
458 None,
459 Some(vec![
460 "db1.table1".to_string(),
461 "db1.table2".to_string(),
462 "db2.table3".to_string(),
463 ]),
464 )
465 .unwrap();
466
467 let tables = get_schema_excluded_tables_for_db(&filter, "db1").unwrap();
469 assert_eq!(
471 tables,
472 vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
473 );
474
475 let tables = get_schema_excluded_tables_for_db(&filter, "db2").unwrap();
476 assert_eq!(tables, vec!["\"public\".\"table3\""]);
477
478 let tables = get_schema_excluded_tables_for_db(&filter, "db3");
479 assert!(tables.is_none() || tables.unwrap().is_empty());
480 }
481
482 #[test]
483 fn test_get_data_excluded_tables_for_db() {
484 let filter = crate::filters::ReplicationFilter::new(
485 None,
486 None,
487 None,
488 Some(vec![
489 "db1.table1".to_string(),
490 "db1.table2".to_string(),
491 "db2.table3".to_string(),
492 ]),
493 )
494 .unwrap();
495
496 let tables = get_data_excluded_tables_for_db(&filter, "db1").unwrap();
498 assert_eq!(
500 tables,
501 vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
502 );
503
504 let tables = get_data_excluded_tables_for_db(&filter, "db2").unwrap();
505 assert_eq!(tables, vec!["\"public\".\"table3\""]);
506
507 let tables = get_data_excluded_tables_for_db(&filter, "db3");
508 assert!(tables.is_none() || tables.unwrap().is_empty());
509 }
510
511 #[test]
512 fn test_get_included_tables_for_db() {
513 let filter = crate::filters::ReplicationFilter::new(
514 None,
515 None,
516 Some(vec![
517 "db1.users".to_string(),
518 "db1.orders".to_string(),
519 "db2.products".to_string(),
520 ]),
521 None,
522 )
523 .unwrap();
524
525 let tables = get_included_tables_for_db(&filter, "db1").unwrap();
526 assert_eq!(
528 tables,
529 vec!["\"public\".\"users\"", "\"public\".\"orders\""]
530 );
531
532 let tables = get_included_tables_for_db(&filter, "db2").unwrap();
533 assert_eq!(tables, vec!["\"public\".\"products\""]);
534
535 let tables = get_included_tables_for_db(&filter, "db3");
536 assert!(tables.is_none() || tables.unwrap().is_empty());
537 }
538
539 #[test]
540 fn test_get_schema_excluded_tables_for_db_with_empty_filter() {
541 let filter = crate::filters::ReplicationFilter::empty();
542 let tables = get_schema_excluded_tables_for_db(&filter, "db1");
543 assert!(tables.is_none());
544 }
545
546 #[test]
547 fn test_get_data_excluded_tables_for_db_with_empty_filter() {
548 let filter = crate::filters::ReplicationFilter::empty();
549 let tables = get_data_excluded_tables_for_db(&filter, "db1");
550 assert!(tables.is_none());
551 }
552
553 #[test]
554 fn test_get_included_tables_for_db_with_empty_filter() {
555 let filter = crate::filters::ReplicationFilter::empty();
556 let tables = get_included_tables_for_db(&filter, "db1");
557 assert!(tables.is_none());
558 }
559}