database_replicator/migration/
dump.rs1use crate::filters::ReplicationFilter;
5use anyhow::{Context, Result};
6use std::collections::BTreeSet;
7use std::fs;
8use std::process::{Command, Stdio};
9use std::time::Duration;
10
11pub async fn dump_globals(source_url: &str, output_path: &str) -> Result<()> {
13 tracing::info!("Dumping global objects to {}", output_path);
14
15 let parts = crate::utils::parse_postgres_url(source_url)
17 .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
18 let pgpass = crate::utils::PgPassFile::new(&parts)
19 .context("Failed to create .pgpass file for authentication")?;
20
21 let env_vars = parts.to_pg_env_vars();
22 let output_path_owned = output_path.to_string();
23
24 crate::utils::retry_subprocess_with_backoff(
26 || {
27 let mut cmd = Command::new("pg_dumpall");
28 cmd.arg("--globals-only")
29 .arg("--no-role-passwords") .arg("--verbose") .arg("--host")
32 .arg(&parts.host)
33 .arg("--port")
34 .arg(parts.port.to_string())
35 .arg("--database")
36 .arg(&parts.database)
37 .arg(format!("--file={}", output_path_owned))
38 .env("PGPASSFILE", pgpass.path())
39 .stdout(Stdio::inherit())
40 .stderr(Stdio::inherit());
41
42 if let Some(user) = &parts.user {
44 cmd.arg("--username").arg(user);
45 }
46
47 for (env_var, value) in &env_vars {
49 cmd.env(env_var, value);
50 }
51
52 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
54 cmd.env(env_var, value);
55 }
56
57 cmd.status().context(
58 "Failed to execute pg_dumpall. Is PostgreSQL client installed?\n\
59 Install with:\n\
60 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
61 - macOS: brew install postgresql\n\
62 - RHEL/CentOS: sudo yum install postgresql",
63 )
64 },
65 3, Duration::from_secs(1), "pg_dumpall (dump globals)",
68 )
69 .await
70 .context(
71 "pg_dumpall failed to dump global objects.\n\
72 \n\
73 Common causes:\n\
74 - Connection authentication failed\n\
75 - User lacks sufficient privileges (need SUPERUSER or pg_read_all_settings role)\n\
76 - Network connectivity issues\n\
77 - Invalid connection string\n\
78 - Connection timeout or network issues",
79 )?;
80
81 tracing::info!("✓ Global objects dumped successfully");
82 Ok(())
83}
84
85pub fn sanitize_globals_dump(path: &str) -> Result<()> {
96 let content = fs::read_to_string(path)
97 .with_context(|| format!("Failed to read globals dump at {}", path))?;
98
99 if let Some(updated) = rewrite_create_role_statements(&content) {
100 fs::write(path, updated)
101 .with_context(|| format!("Failed to update globals dump at {}", path))?;
102 }
103
104 Ok(())
105}
106
107fn rewrite_create_role_statements(sql: &str) -> Option<String> {
108 if sql.is_empty() {
109 return None;
110 }
111
112 let mut output = String::with_capacity(sql.len() + 1024);
113 let mut modified = false;
114 let mut cursor = 0;
115
116 while cursor < sql.len() {
117 if let Some(rel_pos) = sql[cursor..].find('\n') {
118 let end = cursor + rel_pos + 1;
119 let chunk = &sql[cursor..end];
120 if let Some(transformed) = wrap_create_role_line(chunk) {
121 output.push_str(&transformed);
122 modified = true;
123 } else {
124 output.push_str(chunk);
125 }
126 cursor = end;
127 } else {
128 let chunk = &sql[cursor..];
129 if let Some(transformed) = wrap_create_role_line(chunk) {
130 output.push_str(&transformed);
131 modified = true;
132 } else {
133 output.push_str(chunk);
134 }
135 break;
136 }
137 }
138
139 if modified {
140 Some(output)
141 } else {
142 None
143 }
144}
145
146fn wrap_create_role_line(chunk: &str) -> Option<String> {
147 let trimmed = chunk.trim_start();
148 if !trimmed.starts_with("CREATE ROLE ") {
149 return None;
150 }
151
152 let statement = trimmed.trim_end();
153 let statement_body = statement.trim_end_matches(';').trim_end();
154 let leading_ws_len = chunk.len() - trimmed.len();
155 let leading_ws = &chunk[..leading_ws_len];
156 let newline = if chunk.ends_with("\r\n") {
157 "\r\n"
158 } else if chunk.ends_with('\n') {
159 "\n"
160 } else {
161 ""
162 };
163
164 let role_token = extract_role_token(statement_body)?;
165
166 let notice_name = escape_single_quotes(&unquote_role_name(&role_token));
167
168 let mut block = String::with_capacity(chunk.len() + 128);
169 block.push_str(leading_ws);
170 block.push_str("DO $$\n");
171 block.push_str(leading_ws);
172 block.push_str("BEGIN\n");
173 block.push_str(leading_ws);
174 block.push_str(" ");
175 block.push_str(statement_body);
176 block.push_str(";\n");
177 block.push_str(leading_ws);
178 block.push_str("EXCEPTION\n");
179 block.push_str(leading_ws);
180 block.push_str(" WHEN duplicate_object THEN\n");
181 block.push_str(leading_ws);
182 block.push_str(" RAISE NOTICE 'Role ");
183 block.push_str(¬ice_name);
184 block.push_str(" already exists on target, skipping CREATE ROLE';\n");
185 block.push_str(leading_ws);
186 block.push_str("END $$;");
187
188 if !newline.is_empty() {
189 block.push_str(newline);
190 }
191
192 Some(block)
193}
194
195fn extract_role_token(statement: &str) -> Option<String> {
196 let remainder = statement.strip_prefix("CREATE ROLE")?.trim_start();
197
198 if remainder.starts_with('"') {
199 let mut idx = 1;
200 let bytes = remainder.as_bytes();
201 while idx < bytes.len() {
202 if bytes[idx] == b'"' {
203 if idx + 1 < bytes.len() && bytes[idx + 1] == b'"' {
204 idx += 2;
205 continue;
206 } else {
207 idx += 1;
208 break;
209 }
210 }
211 idx += 1;
212 }
213 if idx <= remainder.len() {
214 return Some(remainder[..idx].to_string());
215 }
216 None
217 } else {
218 let mut end = remainder.len();
219 for (i, ch) in remainder.char_indices() {
220 if ch.is_whitespace() || ch == ';' {
221 end = i;
222 break;
223 }
224 }
225 if end == 0 {
226 None
227 } else {
228 Some(remainder[..end].to_string())
229 }
230 }
231}
232
233fn unquote_role_name(token: &str) -> String {
234 if token.starts_with('"') && token.ends_with('"') && token.len() >= 2 {
235 let inner = &token[1..token.len() - 1];
236 inner.replace("\"\"", "\"")
237 } else {
238 token.to_string()
239 }
240}
241
242fn escape_single_quotes(value: &str) -> String {
243 value.replace('\'', "''")
244}
245
246pub async fn dump_schema(
248 source_url: &str,
249 database: &str,
250 output_path: &str,
251 filter: &ReplicationFilter,
252) -> Result<()> {
253 tracing::info!(
254 "Dumping schema for database '{}' to {}",
255 database,
256 output_path
257 );
258
259 let parts = crate::utils::parse_postgres_url(source_url)
261 .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
262 let pgpass = crate::utils::PgPassFile::new(&parts)
263 .context("Failed to create .pgpass file for authentication")?;
264
265 let env_vars = parts.to_pg_env_vars();
266 let output_path_owned = output_path.to_string();
267
268 let exclude_tables = get_schema_excluded_tables_for_db(filter, database);
270 let include_tables = get_included_tables_for_db(filter, database);
271
272 crate::utils::retry_subprocess_with_backoff(
274 || {
275 let mut cmd = Command::new("pg_dump");
276 cmd.arg("--schema-only")
277 .arg("--no-owner") .arg("--no-privileges") .arg("--verbose"); if let Some(ref exclude) = exclude_tables {
284 if !exclude.is_empty() {
285 for table in exclude {
286 cmd.arg("--exclude-table").arg(table);
287 }
288 }
289 }
290
291 if let Some(ref include) = include_tables {
293 if !include.is_empty() {
294 for table in include {
295 cmd.arg("--table").arg(table);
296 }
297 }
298 }
299
300 cmd.arg("--host")
301 .arg(&parts.host)
302 .arg("--port")
303 .arg(parts.port.to_string())
304 .arg("--dbname")
305 .arg(&parts.database)
306 .arg(format!("--file={}", output_path_owned))
307 .env("PGPASSFILE", pgpass.path())
308 .stdout(Stdio::inherit())
309 .stderr(Stdio::inherit());
310
311 if let Some(user) = &parts.user {
313 cmd.arg("--username").arg(user);
314 }
315
316 for (env_var, value) in &env_vars {
318 cmd.env(env_var, value);
319 }
320
321 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
323 cmd.env(env_var, value);
324 }
325
326 cmd.status().context(
327 "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
328 Install with:\n\
329 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
330 - macOS: brew install postgresql\n\
331 - RHEL/CentOS: sudo yum install postgresql",
332 )
333 },
334 3, Duration::from_secs(1), "pg_dump (dump schema)",
337 )
338 .await
339 .with_context(|| {
340 format!(
341 "pg_dump failed to dump schema for database '{}'.\n\
342 \n\
343 Common causes:\n\
344 - Database does not exist\n\
345 - Connection authentication failed\n\
346 - User lacks privileges to read database schema\n\
347 - Network connectivity issues\n\
348 - Connection timeout or network issues",
349 database
350 )
351 })?;
352
353 tracing::info!("✓ Schema dumped successfully");
354 Ok(())
355}
356
357pub async fn dump_data(
367 source_url: &str,
368 database: &str,
369 output_path: &str,
370 filter: &ReplicationFilter,
371) -> Result<()> {
372 let num_cpus = std::thread::available_parallelism()
374 .map(|n| n.get().min(8))
375 .unwrap_or(4);
376
377 tracing::info!(
378 "Dumping data for database '{}' to {} (parallel={}, compression=9, format=directory)",
379 database,
380 output_path,
381 num_cpus
382 );
383
384 let parts = crate::utils::parse_postgres_url(source_url)
386 .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
387 let pgpass = crate::utils::PgPassFile::new(&parts)
388 .context("Failed to create .pgpass file for authentication")?;
389
390 let env_vars = parts.to_pg_env_vars();
391 let output_path_owned = output_path.to_string();
392
393 let exclude_tables = get_data_excluded_tables_for_db(filter, database);
395 let include_tables = get_included_tables_for_db(filter, database);
396
397 crate::utils::retry_subprocess_with_backoff(
399 || {
400 let mut cmd = Command::new("pg_dump");
401 cmd.arg("--data-only")
402 .arg("--no-owner")
403 .arg("--format=directory") .arg("--blobs") .arg("--compress=9") .arg(format!("--jobs={}", num_cpus)) .arg("--verbose"); if let Some(ref exclude) = exclude_tables {
412 if !exclude.is_empty() {
413 for table in exclude {
414 cmd.arg("--exclude-table-data").arg(table);
415 }
416 }
417 }
418
419 if let Some(ref include) = include_tables {
421 if !include.is_empty() {
422 for table in include {
423 cmd.arg("--table").arg(table);
424 }
425 }
426 }
427
428 cmd.arg("--host")
429 .arg(&parts.host)
430 .arg("--port")
431 .arg(parts.port.to_string())
432 .arg("--dbname")
433 .arg(&parts.database)
434 .arg(format!("--file={}", output_path_owned))
435 .env("PGPASSFILE", pgpass.path())
436 .stdout(Stdio::inherit())
437 .stderr(Stdio::inherit());
438
439 if let Some(user) = &parts.user {
441 cmd.arg("--username").arg(user);
442 }
443
444 for (env_var, value) in &env_vars {
446 cmd.env(env_var, value);
447 }
448
449 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
451 cmd.env(env_var, value);
452 }
453
454 cmd.status().context(
455 "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
456 Install with:\n\
457 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
458 - macOS: brew install postgresql\n\
459 - RHEL/CentOS: sudo yum install postgresql",
460 )
461 },
462 3, Duration::from_secs(1), "pg_dump (dump data)",
465 )
466 .await
467 .with_context(|| {
468 format!(
469 "pg_dump failed to dump data for database '{}'.\n\
470 \n\
471 Common causes:\n\
472 - Database does not exist\n\
473 - Connection authentication failed\n\
474 - User lacks privileges to read table data\n\
475 - Network connectivity issues\n\
476 - Insufficient disk space for dump directory\n\
477 - Output directory already exists (pg_dump requires non-existent path)\n\
478 - Connection timeout or network issues",
479 database
480 )
481 })?;
482
483 tracing::info!(
484 "✓ Data dumped successfully using {} parallel jobs",
485 num_cpus
486 );
487 Ok(())
488}
489
490fn get_schema_excluded_tables_for_db(
495 filter: &ReplicationFilter,
496 db_name: &str,
497) -> Option<Vec<String>> {
498 let mut tables = BTreeSet::new();
499
500 if let Some(explicit) = filter.exclude_tables() {
503 for full_name in explicit {
504 let parts: Vec<&str> = full_name.split('.').collect();
505 if parts.len() == 2 && parts[0] == db_name {
506 tables.insert(format!("\"public\".\"{}\"", parts[1]));
508 }
509 }
510 }
511
512 if tables.is_empty() {
513 None
514 } else {
515 Some(tables.into_iter().collect())
516 }
517}
518
519fn get_data_excluded_tables_for_db(
524 filter: &ReplicationFilter,
525 db_name: &str,
526) -> Option<Vec<String>> {
527 let mut tables = BTreeSet::new();
528
529 if let Some(explicit) = filter.exclude_tables() {
532 for full_name in explicit {
533 let parts: Vec<&str> = full_name.split('.').collect();
534 if parts.len() == 2 && parts[0] == db_name {
535 tables.insert(format!("\"public\".\"{}\"", parts[1]));
537 }
538 }
539 }
540
541 for table in filter.schema_only_tables(db_name) {
543 tables.insert(table);
544 }
545
546 for (table, _) in filter.predicate_tables(db_name) {
547 tables.insert(table);
548 }
549
550 if tables.is_empty() {
551 None
552 } else {
553 Some(tables.into_iter().collect())
554 }
555}
556
557fn get_included_tables_for_db(filter: &ReplicationFilter, db_name: &str) -> Option<Vec<String>> {
560 filter.include_tables().map(|tables| {
561 tables
562 .iter()
563 .filter_map(|full_name| {
564 let parts: Vec<&str> = full_name.split('.').collect();
565 if parts.len() == 2 && parts[0] == db_name {
566 Some(format!("\"public\".\"{}\"", parts[1]))
568 } else {
569 None
570 }
571 })
572 .collect()
573 })
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579 use tempfile::tempdir;
580
581 #[tokio::test]
582 #[ignore]
583 async fn test_dump_globals() {
584 let url = std::env::var("TEST_SOURCE_URL").unwrap();
585 let dir = tempdir().unwrap();
586 let output = dir.path().join("globals.sql");
587
588 let result = dump_globals(&url, output.to_str().unwrap()).await;
589
590 assert!(result.is_ok());
591 assert!(output.exists());
592
593 let content = std::fs::read_to_string(&output).unwrap();
595 assert!(content.contains("CREATE ROLE") || !content.is_empty());
596 }
597
598 #[tokio::test]
599 #[ignore]
600 async fn test_dump_schema() {
601 let url = std::env::var("TEST_SOURCE_URL").unwrap();
602 let dir = tempdir().unwrap();
603 let output = dir.path().join("schema.sql");
604
605 let db = url.split('/').next_back().unwrap_or("postgres");
607
608 let filter = crate::filters::ReplicationFilter::empty();
609 let result = dump_schema(&url, db, output.to_str().unwrap(), &filter).await;
610
611 assert!(result.is_ok());
612 assert!(output.exists());
613 }
614
615 #[test]
616 fn test_get_schema_excluded_tables_for_db() {
617 let filter = crate::filters::ReplicationFilter::new(
618 None,
619 None,
620 None,
621 Some(vec![
622 "db1.table1".to_string(),
623 "db1.table2".to_string(),
624 "db2.table3".to_string(),
625 ]),
626 )
627 .unwrap();
628
629 let tables = get_schema_excluded_tables_for_db(&filter, "db1").unwrap();
631 assert_eq!(
633 tables,
634 vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
635 );
636
637 let tables = get_schema_excluded_tables_for_db(&filter, "db2").unwrap();
638 assert_eq!(tables, vec!["\"public\".\"table3\""]);
639
640 let tables = get_schema_excluded_tables_for_db(&filter, "db3");
641 assert!(tables.is_none() || tables.unwrap().is_empty());
642 }
643
644 #[test]
645 fn test_get_data_excluded_tables_for_db() {
646 let filter = crate::filters::ReplicationFilter::new(
647 None,
648 None,
649 None,
650 Some(vec![
651 "db1.table1".to_string(),
652 "db1.table2".to_string(),
653 "db2.table3".to_string(),
654 ]),
655 )
656 .unwrap();
657
658 let tables = get_data_excluded_tables_for_db(&filter, "db1").unwrap();
660 assert_eq!(
662 tables,
663 vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
664 );
665
666 let tables = get_data_excluded_tables_for_db(&filter, "db2").unwrap();
667 assert_eq!(tables, vec!["\"public\".\"table3\""]);
668
669 let tables = get_data_excluded_tables_for_db(&filter, "db3");
670 assert!(tables.is_none() || tables.unwrap().is_empty());
671 }
672
673 #[test]
674 fn test_get_included_tables_for_db() {
675 let filter = crate::filters::ReplicationFilter::new(
676 None,
677 None,
678 Some(vec![
679 "db1.users".to_string(),
680 "db1.orders".to_string(),
681 "db2.products".to_string(),
682 ]),
683 None,
684 )
685 .unwrap();
686
687 let tables = get_included_tables_for_db(&filter, "db1").unwrap();
688 assert_eq!(
690 tables,
691 vec!["\"public\".\"users\"", "\"public\".\"orders\""]
692 );
693
694 let tables = get_included_tables_for_db(&filter, "db2").unwrap();
695 assert_eq!(tables, vec!["\"public\".\"products\""]);
696
697 let tables = get_included_tables_for_db(&filter, "db3");
698 assert!(tables.is_none() || tables.unwrap().is_empty());
699 }
700
701 #[test]
702 fn test_get_schema_excluded_tables_for_db_with_empty_filter() {
703 let filter = crate::filters::ReplicationFilter::empty();
704 let tables = get_schema_excluded_tables_for_db(&filter, "db1");
705 assert!(tables.is_none());
706 }
707
708 #[test]
709 fn test_get_data_excluded_tables_for_db_with_empty_filter() {
710 let filter = crate::filters::ReplicationFilter::empty();
711 let tables = get_data_excluded_tables_for_db(&filter, "db1");
712 assert!(tables.is_none());
713 }
714
715 #[test]
716 fn test_get_included_tables_for_db_with_empty_filter() {
717 let filter = crate::filters::ReplicationFilter::empty();
718 let tables = get_included_tables_for_db(&filter, "db1");
719 assert!(tables.is_none());
720 }
721
722 #[test]
723 fn test_rewrite_create_role_statements_wraps_unquoted_role() {
724 let sql = "CREATE ROLE replicator WITH LOGIN;\nALTER ROLE replicator WITH LOGIN;\n";
725 let rewritten = rewrite_create_role_statements(sql).expect("rewrite expected");
726
727 assert!(rewritten.contains("DO $$"));
728 assert!(rewritten.contains("Role replicator already exists"));
729 assert!(rewritten.contains("CREATE ROLE replicator WITH LOGIN;"));
730 assert!(rewritten.contains("ALTER ROLE replicator WITH LOGIN;"));
731 }
732
733 #[test]
734 fn test_rewrite_create_role_statements_wraps_quoted_role() {
735 let sql = " CREATE ROLE \"Andre Admin\";\n";
736 let rewritten = rewrite_create_role_statements(sql).expect("rewrite expected");
737
738 assert!(rewritten.contains("DO $$"));
739 assert!(rewritten.contains("Andre Admin already exists"));
740 assert!(rewritten.contains("CREATE ROLE \"Andre Admin\""));
741 assert!(rewritten.starts_with(" DO $$"));
742 }
743
744 #[test]
745 fn test_rewrite_create_role_statements_noop_when_absent() {
746 let sql = "ALTER ROLE existing WITH LOGIN;\n";
747 assert!(rewrite_create_role_statements(sql).is_none());
748 }
749}