database_replicator/migration/
dump.rs1use crate::filters::ReplicationFilter;
5use anyhow::{Context, Result};
6use std::collections::BTreeSet;
7use std::process::{Command, Stdio};
8use std::time::Duration;
9
10pub async fn dump_globals(source_url: &str, output_path: &str) -> Result<()> {
12 tracing::info!("Dumping global objects to {}", output_path);
13
14 let parts = crate::utils::parse_postgres_url(source_url)
16 .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
17 let pgpass = crate::utils::PgPassFile::new(&parts)
18 .context("Failed to create .pgpass file for authentication")?;
19
20 let env_vars = parts.to_pg_env_vars();
21 let output_path_owned = output_path.to_string();
22
23 crate::utils::retry_subprocess_with_backoff(
25 || {
26 let mut cmd = Command::new("pg_dumpall");
27 cmd.arg("--globals-only")
28 .arg("--no-role-passwords") .arg("--verbose") .arg("--host")
31 .arg(&parts.host)
32 .arg("--port")
33 .arg(parts.port.to_string())
34 .arg("--database")
35 .arg(&parts.database)
36 .arg(format!("--file={}", output_path_owned))
37 .env("PGPASSFILE", pgpass.path())
38 .stdout(Stdio::inherit())
39 .stderr(Stdio::inherit());
40
41 if let Some(user) = &parts.user {
43 cmd.arg("--username").arg(user);
44 }
45
46 for (env_var, value) in &env_vars {
48 cmd.env(env_var, value);
49 }
50
51 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
53 cmd.env(env_var, value);
54 }
55
56 cmd.status().context(
57 "Failed to execute pg_dumpall. Is PostgreSQL client installed?\n\
58 Install with:\n\
59 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
60 - macOS: brew install postgresql\n\
61 - RHEL/CentOS: sudo yum install postgresql",
62 )
63 },
64 3, Duration::from_secs(1), "pg_dumpall (dump globals)",
67 )
68 .context(
69 "pg_dumpall failed to dump global objects.\n\
70 \n\
71 Common causes:\n\
72 - Connection authentication failed\n\
73 - User lacks sufficient privileges (need SUPERUSER or pg_read_all_settings role)\n\
74 - Network connectivity issues\n\
75 - Invalid connection string\n\
76 - Connection timeout or network issues",
77 )?;
78
79 tracing::info!("✓ Global objects dumped successfully");
80 Ok(())
81}
82
83pub async fn dump_schema(
85 source_url: &str,
86 database: &str,
87 output_path: &str,
88 filter: &ReplicationFilter,
89) -> Result<()> {
90 tracing::info!(
91 "Dumping schema for database '{}' to {}",
92 database,
93 output_path
94 );
95
96 let parts = crate::utils::parse_postgres_url(source_url)
98 .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
99 let pgpass = crate::utils::PgPassFile::new(&parts)
100 .context("Failed to create .pgpass file for authentication")?;
101
102 let env_vars = parts.to_pg_env_vars();
103 let output_path_owned = output_path.to_string();
104
105 let exclude_tables = get_schema_excluded_tables_for_db(filter, database);
107 let include_tables = get_included_tables_for_db(filter, database);
108
109 crate::utils::retry_subprocess_with_backoff(
111 || {
112 let mut cmd = Command::new("pg_dump");
113 cmd.arg("--schema-only")
114 .arg("--no-owner") .arg("--no-privileges") .arg("--verbose"); if let Some(ref exclude) = exclude_tables {
121 if !exclude.is_empty() {
122 for table in exclude {
123 cmd.arg("--exclude-table").arg(table);
124 }
125 }
126 }
127
128 if let Some(ref include) = include_tables {
130 if !include.is_empty() {
131 for table in include {
132 cmd.arg("--table").arg(table);
133 }
134 }
135 }
136
137 cmd.arg("--host")
138 .arg(&parts.host)
139 .arg("--port")
140 .arg(parts.port.to_string())
141 .arg("--dbname")
142 .arg(&parts.database)
143 .arg(format!("--file={}", output_path_owned))
144 .env("PGPASSFILE", pgpass.path())
145 .stdout(Stdio::inherit())
146 .stderr(Stdio::inherit());
147
148 if let Some(user) = &parts.user {
150 cmd.arg("--username").arg(user);
151 }
152
153 for (env_var, value) in &env_vars {
155 cmd.env(env_var, value);
156 }
157
158 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
160 cmd.env(env_var, value);
161 }
162
163 cmd.status().context(
164 "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
165 Install with:\n\
166 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
167 - macOS: brew install postgresql\n\
168 - RHEL/CentOS: sudo yum install postgresql",
169 )
170 },
171 3, Duration::from_secs(1), "pg_dump (dump schema)",
174 )
175 .with_context(|| {
176 format!(
177 "pg_dump failed to dump schema for database '{}'.\n\
178 \n\
179 Common causes:\n\
180 - Database does not exist\n\
181 - Connection authentication failed\n\
182 - User lacks privileges to read database schema\n\
183 - Network connectivity issues\n\
184 - Connection timeout or network issues",
185 database
186 )
187 })?;
188
189 tracing::info!("✓ Schema dumped successfully");
190 Ok(())
191}
192
193pub async fn dump_data(
203 source_url: &str,
204 database: &str,
205 output_path: &str,
206 filter: &ReplicationFilter,
207) -> Result<()> {
208 let num_cpus = std::thread::available_parallelism()
210 .map(|n| n.get().min(8))
211 .unwrap_or(4);
212
213 tracing::info!(
214 "Dumping data for database '{}' to {} (parallel={}, compression=9, format=directory)",
215 database,
216 output_path,
217 num_cpus
218 );
219
220 let parts = crate::utils::parse_postgres_url(source_url)
222 .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
223 let pgpass = crate::utils::PgPassFile::new(&parts)
224 .context("Failed to create .pgpass file for authentication")?;
225
226 let env_vars = parts.to_pg_env_vars();
227 let output_path_owned = output_path.to_string();
228
229 let exclude_tables = get_data_excluded_tables_for_db(filter, database);
231 let include_tables = get_included_tables_for_db(filter, database);
232
233 crate::utils::retry_subprocess_with_backoff(
235 || {
236 let mut cmd = Command::new("pg_dump");
237 cmd.arg("--data-only")
238 .arg("--no-owner")
239 .arg("--format=directory") .arg("--blobs") .arg("--compress=9") .arg(format!("--jobs={}", num_cpus)) .arg("--verbose"); if let Some(ref exclude) = exclude_tables {
248 if !exclude.is_empty() {
249 for table in exclude {
250 cmd.arg("--exclude-table-data").arg(table);
251 }
252 }
253 }
254
255 if let Some(ref include) = include_tables {
257 if !include.is_empty() {
258 for table in include {
259 cmd.arg("--table").arg(table);
260 }
261 }
262 }
263
264 cmd.arg("--host")
265 .arg(&parts.host)
266 .arg("--port")
267 .arg(parts.port.to_string())
268 .arg("--dbname")
269 .arg(&parts.database)
270 .arg(format!("--file={}", output_path_owned))
271 .env("PGPASSFILE", pgpass.path())
272 .stdout(Stdio::inherit())
273 .stderr(Stdio::inherit());
274
275 if let Some(user) = &parts.user {
277 cmd.arg("--username").arg(user);
278 }
279
280 for (env_var, value) in &env_vars {
282 cmd.env(env_var, value);
283 }
284
285 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
287 cmd.env(env_var, value);
288 }
289
290 cmd.status().context(
291 "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
292 Install with:\n\
293 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
294 - macOS: brew install postgresql\n\
295 - RHEL/CentOS: sudo yum install postgresql",
296 )
297 },
298 3, Duration::from_secs(1), "pg_dump (dump data)",
301 )
302 .with_context(|| {
303 format!(
304 "pg_dump failed to dump data for database '{}'.\n\
305 \n\
306 Common causes:\n\
307 - Database does not exist\n\
308 - Connection authentication failed\n\
309 - User lacks privileges to read table data\n\
310 - Network connectivity issues\n\
311 - Insufficient disk space for dump directory\n\
312 - Output directory already exists (pg_dump requires non-existent path)\n\
313 - Connection timeout or network issues",
314 database
315 )
316 })?;
317
318 tracing::info!(
319 "✓ Data dumped successfully using {} parallel jobs",
320 num_cpus
321 );
322 Ok(())
323}
324
325fn get_schema_excluded_tables_for_db(
330 filter: &ReplicationFilter,
331 db_name: &str,
332) -> Option<Vec<String>> {
333 let mut tables = BTreeSet::new();
334
335 if let Some(explicit) = filter.exclude_tables() {
338 for full_name in explicit {
339 let parts: Vec<&str> = full_name.split('.').collect();
340 if parts.len() == 2 && parts[0] == db_name {
341 tables.insert(format!("\"public\".\"{}\"", parts[1]));
343 }
344 }
345 }
346
347 if tables.is_empty() {
348 None
349 } else {
350 Some(tables.into_iter().collect())
351 }
352}
353
354fn get_data_excluded_tables_for_db(
359 filter: &ReplicationFilter,
360 db_name: &str,
361) -> Option<Vec<String>> {
362 let mut tables = BTreeSet::new();
363
364 if let Some(explicit) = filter.exclude_tables() {
367 for full_name in explicit {
368 let parts: Vec<&str> = full_name.split('.').collect();
369 if parts.len() == 2 && parts[0] == db_name {
370 tables.insert(format!("\"public\".\"{}\"", parts[1]));
372 }
373 }
374 }
375
376 for table in filter.schema_only_tables(db_name) {
378 tables.insert(table);
379 }
380
381 for (table, _) in filter.predicate_tables(db_name) {
382 tables.insert(table);
383 }
384
385 if tables.is_empty() {
386 None
387 } else {
388 Some(tables.into_iter().collect())
389 }
390}
391
392fn get_included_tables_for_db(filter: &ReplicationFilter, db_name: &str) -> Option<Vec<String>> {
395 filter.include_tables().map(|tables| {
396 tables
397 .iter()
398 .filter_map(|full_name| {
399 let parts: Vec<&str> = full_name.split('.').collect();
400 if parts.len() == 2 && parts[0] == db_name {
401 Some(format!("\"public\".\"{}\"", parts[1]))
403 } else {
404 None
405 }
406 })
407 .collect()
408 })
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414 use tempfile::tempdir;
415
416 #[tokio::test]
417 #[ignore]
418 async fn test_dump_globals() {
419 let url = std::env::var("TEST_SOURCE_URL").unwrap();
420 let dir = tempdir().unwrap();
421 let output = dir.path().join("globals.sql");
422
423 let result = dump_globals(&url, output.to_str().unwrap()).await;
424
425 assert!(result.is_ok());
426 assert!(output.exists());
427
428 let content = std::fs::read_to_string(&output).unwrap();
430 assert!(content.contains("CREATE ROLE") || !content.is_empty());
431 }
432
433 #[tokio::test]
434 #[ignore]
435 async fn test_dump_schema() {
436 let url = std::env::var("TEST_SOURCE_URL").unwrap();
437 let dir = tempdir().unwrap();
438 let output = dir.path().join("schema.sql");
439
440 let db = url.split('/').next_back().unwrap_or("postgres");
442
443 let filter = crate::filters::ReplicationFilter::empty();
444 let result = dump_schema(&url, db, output.to_str().unwrap(), &filter).await;
445
446 assert!(result.is_ok());
447 assert!(output.exists());
448 }
449
450 #[test]
451 fn test_get_schema_excluded_tables_for_db() {
452 let filter = crate::filters::ReplicationFilter::new(
453 None,
454 None,
455 None,
456 Some(vec![
457 "db1.table1".to_string(),
458 "db1.table2".to_string(),
459 "db2.table3".to_string(),
460 ]),
461 )
462 .unwrap();
463
464 let tables = get_schema_excluded_tables_for_db(&filter, "db1").unwrap();
466 assert_eq!(
468 tables,
469 vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
470 );
471
472 let tables = get_schema_excluded_tables_for_db(&filter, "db2").unwrap();
473 assert_eq!(tables, vec!["\"public\".\"table3\""]);
474
475 let tables = get_schema_excluded_tables_for_db(&filter, "db3");
476 assert!(tables.is_none() || tables.unwrap().is_empty());
477 }
478
479 #[test]
480 fn test_get_data_excluded_tables_for_db() {
481 let filter = crate::filters::ReplicationFilter::new(
482 None,
483 None,
484 None,
485 Some(vec![
486 "db1.table1".to_string(),
487 "db1.table2".to_string(),
488 "db2.table3".to_string(),
489 ]),
490 )
491 .unwrap();
492
493 let tables = get_data_excluded_tables_for_db(&filter, "db1").unwrap();
495 assert_eq!(
497 tables,
498 vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
499 );
500
501 let tables = get_data_excluded_tables_for_db(&filter, "db2").unwrap();
502 assert_eq!(tables, vec!["\"public\".\"table3\""]);
503
504 let tables = get_data_excluded_tables_for_db(&filter, "db3");
505 assert!(tables.is_none() || tables.unwrap().is_empty());
506 }
507
508 #[test]
509 fn test_get_included_tables_for_db() {
510 let filter = crate::filters::ReplicationFilter::new(
511 None,
512 None,
513 Some(vec![
514 "db1.users".to_string(),
515 "db1.orders".to_string(),
516 "db2.products".to_string(),
517 ]),
518 None,
519 )
520 .unwrap();
521
522 let tables = get_included_tables_for_db(&filter, "db1").unwrap();
523 assert_eq!(
525 tables,
526 vec!["\"public\".\"users\"", "\"public\".\"orders\""]
527 );
528
529 let tables = get_included_tables_for_db(&filter, "db2").unwrap();
530 assert_eq!(tables, vec!["\"public\".\"products\""]);
531
532 let tables = get_included_tables_for_db(&filter, "db3");
533 assert!(tables.is_none() || tables.unwrap().is_empty());
534 }
535
536 #[test]
537 fn test_get_schema_excluded_tables_for_db_with_empty_filter() {
538 let filter = crate::filters::ReplicationFilter::empty();
539 let tables = get_schema_excluded_tables_for_db(&filter, "db1");
540 assert!(tables.is_none());
541 }
542
543 #[test]
544 fn test_get_data_excluded_tables_for_db_with_empty_filter() {
545 let filter = crate::filters::ReplicationFilter::empty();
546 let tables = get_data_excluded_tables_for_db(&filter, "db1");
547 assert!(tables.is_none());
548 }
549
550 #[test]
551 fn test_get_included_tables_for_db_with_empty_filter() {
552 let filter = crate::filters::ReplicationFilter::empty();
553 let tables = get_included_tables_for_db(&filter, "db1");
554 assert!(tables.is_none());
555 }
556}