database_replicator/migration/
dump.rs1use crate::filters::ReplicationFilter;
5use anyhow::{Context, Result};
6use std::collections::BTreeSet;
7use std::fs;
8use std::process::{Command, Stdio};
9use std::time::Duration;
10
11pub async fn dump_globals(source_url: &str, output_path: &str) -> Result<()> {
13 tracing::info!("Dumping global objects to {}", output_path);
14
15 let parts = crate::utils::parse_postgres_url(source_url)
17 .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
18 let pgpass = crate::utils::PgPassFile::new(&parts)
19 .context("Failed to create .pgpass file for authentication")?;
20
21 let env_vars = parts.to_pg_env_vars();
22 let output_path_owned = output_path.to_string();
23
24 crate::utils::retry_subprocess_with_backoff(
26 || {
27 let mut cmd = Command::new("pg_dumpall");
28 cmd.arg("--globals-only")
29 .arg("--no-role-passwords") .arg("--verbose") .arg("--host")
32 .arg(&parts.host)
33 .arg("--port")
34 .arg(parts.port.to_string())
35 .arg("--database")
36 .arg(&parts.database)
37 .arg(format!("--file={}", output_path_owned))
38 .env("PGPASSFILE", pgpass.path())
39 .stdout(Stdio::inherit())
40 .stderr(Stdio::inherit());
41
42 if let Some(user) = &parts.user {
44 cmd.arg("--username").arg(user);
45 }
46
47 for (env_var, value) in &env_vars {
49 cmd.env(env_var, value);
50 }
51
52 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
54 cmd.env(env_var, value);
55 }
56
57 cmd.status().context(
58 "Failed to execute pg_dumpall. Is PostgreSQL client installed?\n\
59 Install with:\n\
60 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
61 - macOS: brew install postgresql\n\
62 - RHEL/CentOS: sudo yum install postgresql",
63 )
64 },
65 3, Duration::from_secs(1), "pg_dumpall (dump globals)",
68 )
69 .await
70 .context(
71 "pg_dumpall failed to dump global objects.\n\
72 \n\
73 Common causes:\n\
74 - Connection authentication failed\n\
75 - User lacks sufficient privileges (need SUPERUSER or pg_read_all_settings role)\n\
76 - Network connectivity issues\n\
77 - Invalid connection string\n\
78 - Connection timeout or network issues",
79 )?;
80
81 tracing::info!("✓ Global objects dumped successfully");
82 Ok(())
83}
84
85pub fn sanitize_globals_dump(path: &str) -> Result<()> {
96 let content = fs::read_to_string(path)
97 .with_context(|| format!("Failed to read globals dump at {}", path))?;
98
99 if let Some(updated) = rewrite_create_role_statements(&content) {
100 fs::write(path, updated)
101 .with_context(|| format!("Failed to update globals dump at {}", path))?;
102 }
103
104 Ok(())
105}
106
107pub fn remove_superuser_from_globals(path: &str) -> Result<()> {
113 let content = fs::read_to_string(path)
114 .with_context(|| format!("Failed to read globals dump at {}", path))?;
115
116 let mut updated = String::with_capacity(content.len());
117 let mut modified = false;
118 for line in content.lines() {
119 if line.contains("ALTER ROLE") && line.contains("SUPERUSER") {
120 updated.push_str("-- ");
121 updated.push_str(line);
122 updated.push('\n');
123 modified = true;
124 } else {
125 updated.push_str(line);
126 updated.push('\n');
127 }
128 }
129
130 if modified {
131 fs::write(path, updated)
132 .with_context(|| format!("Failed to write sanitized globals dump to {}", path))?;
133 }
134
135 Ok(())
136}
137
138pub fn remove_restricted_guc_settings(path: &str) -> Result<()> {
144 const BLOCKED_PATTERNS: &[&str] = &["set log_statement"];
145
146 let content = fs::read_to_string(path)
147 .with_context(|| format!("Failed to read globals dump at {}", path))?;
148
149 let mut updated = String::with_capacity(content.len());
150 let mut modified = false;
151
152 for line in content.lines() {
153 let trimmed = line.trim_start();
154 if trimmed.starts_with("--") {
155 updated.push_str(line);
156 updated.push('\n');
157 continue;
158 }
159
160 let lower = line.to_ascii_lowercase();
161 let is_blocked = BLOCKED_PATTERNS
162 .iter()
163 .any(|pattern| lower.contains(pattern));
164
165 if is_blocked {
166 updated.push_str("-- ");
167 updated.push_str(line);
168 updated.push('\n');
169 modified = true;
170 } else {
171 updated.push_str(line);
172 updated.push('\n');
173 }
174 }
175
176 if modified {
177 fs::write(path, updated)
178 .with_context(|| format!("Failed to write sanitized globals dump to {}", path))?;
179 }
180
181 Ok(())
182}
183
184fn rewrite_create_role_statements(sql: &str) -> Option<String> {
185 if sql.is_empty() {
186 return None;
187 }
188
189 let mut output = String::with_capacity(sql.len() + 1024);
190 let mut modified = false;
191 let mut cursor = 0;
192
193 while cursor < sql.len() {
194 if let Some(rel_pos) = sql[cursor..].find('\n') {
195 let end = cursor + rel_pos + 1;
196 let chunk = &sql[cursor..end];
197 if let Some(transformed) = wrap_create_role_line(chunk) {
198 output.push_str(&transformed);
199 modified = true;
200 } else {
201 output.push_str(chunk);
202 }
203 cursor = end;
204 } else {
205 let chunk = &sql[cursor..];
206 if let Some(transformed) = wrap_create_role_line(chunk) {
207 output.push_str(&transformed);
208 modified = true;
209 } else {
210 output.push_str(chunk);
211 }
212 break;
213 }
214 }
215
216 if modified {
217 Some(output)
218 } else {
219 None
220 }
221}
222
223fn wrap_create_role_line(chunk: &str) -> Option<String> {
224 let trimmed = chunk.trim_start();
225 if !trimmed.starts_with("CREATE ROLE ") {
226 return None;
227 }
228
229 let statement = trimmed.trim_end();
230 let statement_body = statement.trim_end_matches(';').trim_end();
231 let leading_ws_len = chunk.len() - trimmed.len();
232 let leading_ws = &chunk[..leading_ws_len];
233 let newline = if chunk.ends_with("\r\n") {
234 "\r\n"
235 } else if chunk.ends_with('\n') {
236 "\n"
237 } else {
238 ""
239 };
240
241 let role_token = extract_role_token(statement_body)?;
242
243 let notice_name = escape_single_quotes(&unquote_role_name(&role_token));
244
245 let mut block = String::with_capacity(chunk.len() + 128);
246 block.push_str(leading_ws);
247 block.push_str("DO $$\n");
248 block.push_str(leading_ws);
249 block.push_str("BEGIN\n");
250 block.push_str(leading_ws);
251 block.push_str(" ");
252 block.push_str(statement_body);
253 block.push_str(";\n");
254 block.push_str(leading_ws);
255 block.push_str("EXCEPTION\n");
256 block.push_str(leading_ws);
257 block.push_str(" WHEN duplicate_object THEN\n");
258 block.push_str(leading_ws);
259 block.push_str(" RAISE NOTICE 'Role ");
260 block.push_str(¬ice_name);
261 block.push_str(" already exists on target, skipping CREATE ROLE';\n");
262 block.push_str(leading_ws);
263 block.push_str("END $$;");
264
265 if !newline.is_empty() {
266 block.push_str(newline);
267 }
268
269 Some(block)
270}
271
272fn extract_role_token(statement: &str) -> Option<String> {
273 let remainder = statement.strip_prefix("CREATE ROLE")?.trim_start();
274
275 if remainder.starts_with('"') {
276 let mut idx = 1;
277 let bytes = remainder.as_bytes();
278 while idx < bytes.len() {
279 if bytes[idx] == b'"' {
280 if idx + 1 < bytes.len() && bytes[idx + 1] == b'"' {
281 idx += 2;
282 continue;
283 } else {
284 idx += 1;
285 break;
286 }
287 }
288 idx += 1;
289 }
290 if idx <= remainder.len() {
291 return Some(remainder[..idx].to_string());
292 }
293 None
294 } else {
295 let mut end = remainder.len();
296 for (i, ch) in remainder.char_indices() {
297 if ch.is_whitespace() || ch == ';' {
298 end = i;
299 break;
300 }
301 }
302 if end == 0 {
303 None
304 } else {
305 Some(remainder[..end].to_string())
306 }
307 }
308}
309
310fn unquote_role_name(token: &str) -> String {
311 if token.starts_with('"') && token.ends_with('"') && token.len() >= 2 {
312 let inner = &token[1..token.len() - 1];
313 inner.replace("\"\"", "\"")
314 } else {
315 token.to_string()
316 }
317}
318
319fn escape_single_quotes(value: &str) -> String {
320 value.replace('\'', "''")
321}
322
323pub async fn dump_schema(
325 source_url: &str,
326 database: &str,
327 output_path: &str,
328 filter: &ReplicationFilter,
329) -> Result<()> {
330 tracing::info!(
331 "Dumping schema for database '{}' to {}",
332 database,
333 output_path
334 );
335
336 let parts = crate::utils::parse_postgres_url(source_url)
338 .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
339 let pgpass = crate::utils::PgPassFile::new(&parts)
340 .context("Failed to create .pgpass file for authentication")?;
341
342 let env_vars = parts.to_pg_env_vars();
343 let output_path_owned = output_path.to_string();
344
345 let exclude_tables = get_schema_excluded_tables_for_db(filter, database);
347 let include_tables = get_included_tables_for_db(filter, database);
348
349 crate::utils::retry_subprocess_with_backoff(
351 || {
352 let mut cmd = Command::new("pg_dump");
353 cmd.arg("--schema-only")
354 .arg("--no-owner") .arg("--no-privileges") .arg("--verbose"); if let Some(ref exclude) = exclude_tables {
361 if !exclude.is_empty() {
362 for table in exclude {
363 cmd.arg("--exclude-table").arg(table);
364 }
365 }
366 }
367
368 if let Some(ref include) = include_tables {
370 if !include.is_empty() {
371 for table in include {
372 cmd.arg("--table").arg(table);
373 }
374 }
375 }
376
377 cmd.arg("--host")
378 .arg(&parts.host)
379 .arg("--port")
380 .arg(parts.port.to_string())
381 .arg("--dbname")
382 .arg(&parts.database)
383 .arg(format!("--file={}", output_path_owned))
384 .env("PGPASSFILE", pgpass.path())
385 .stdout(Stdio::inherit())
386 .stderr(Stdio::inherit());
387
388 if let Some(user) = &parts.user {
390 cmd.arg("--username").arg(user);
391 }
392
393 for (env_var, value) in &env_vars {
395 cmd.env(env_var, value);
396 }
397
398 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
400 cmd.env(env_var, value);
401 }
402
403 cmd.status().context(
404 "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
405 Install with:\n\
406 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
407 - macOS: brew install postgresql\n\
408 - RHEL/CentOS: sudo yum install postgresql",
409 )
410 },
411 3, Duration::from_secs(1), "pg_dump (dump schema)",
414 )
415 .await
416 .with_context(|| {
417 format!(
418 "pg_dump failed to dump schema for database '{}'.\n\
419 \n\
420 Common causes:\n\
421 - Database does not exist\n\
422 - Connection authentication failed\n\
423 - User lacks privileges to read database schema\n\
424 - Network connectivity issues\n\
425 - Connection timeout or network issues",
426 database
427 )
428 })?;
429
430 tracing::info!("✓ Schema dumped successfully");
431 Ok(())
432}
433
434pub async fn dump_data(
444 source_url: &str,
445 database: &str,
446 output_path: &str,
447 filter: &ReplicationFilter,
448) -> Result<()> {
449 let num_cpus = std::thread::available_parallelism()
451 .map(|n| n.get().min(8))
452 .unwrap_or(4);
453
454 tracing::info!(
455 "Dumping data for database '{}' to {} (parallel={}, compression=9, format=directory)",
456 database,
457 output_path,
458 num_cpus
459 );
460
461 let parts = crate::utils::parse_postgres_url(source_url)
463 .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
464 let pgpass = crate::utils::PgPassFile::new(&parts)
465 .context("Failed to create .pgpass file for authentication")?;
466
467 let env_vars = parts.to_pg_env_vars();
468 let output_path_owned = output_path.to_string();
469
470 let exclude_tables = get_data_excluded_tables_for_db(filter, database);
472 let include_tables = get_included_tables_for_db(filter, database);
473
474 crate::utils::retry_subprocess_with_backoff(
476 || {
477 let mut cmd = Command::new("pg_dump");
478 cmd.arg("--data-only")
479 .arg("--no-owner")
480 .arg("--format=directory") .arg("--blobs") .arg("--compress=9") .arg(format!("--jobs={}", num_cpus)) .arg("--verbose"); if let Some(ref exclude) = exclude_tables {
489 if !exclude.is_empty() {
490 for table in exclude {
491 cmd.arg("--exclude-table-data").arg(table);
492 }
493 }
494 }
495
496 if let Some(ref include) = include_tables {
498 if !include.is_empty() {
499 for table in include {
500 cmd.arg("--table").arg(table);
501 }
502 }
503 }
504
505 cmd.arg("--host")
506 .arg(&parts.host)
507 .arg("--port")
508 .arg(parts.port.to_string())
509 .arg("--dbname")
510 .arg(&parts.database)
511 .arg(format!("--file={}", output_path_owned))
512 .env("PGPASSFILE", pgpass.path())
513 .stdout(Stdio::inherit())
514 .stderr(Stdio::inherit());
515
516 if let Some(user) = &parts.user {
518 cmd.arg("--username").arg(user);
519 }
520
521 for (env_var, value) in &env_vars {
523 cmd.env(env_var, value);
524 }
525
526 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
528 cmd.env(env_var, value);
529 }
530
531 cmd.status().context(
532 "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
533 Install with:\n\
534 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
535 - macOS: brew install postgresql\n\
536 - RHEL/CentOS: sudo yum install postgresql",
537 )
538 },
539 3, Duration::from_secs(1), "pg_dump (dump data)",
542 )
543 .await
544 .with_context(|| {
545 format!(
546 "pg_dump failed to dump data for database '{}'.\n\
547 \n\
548 Common causes:\n\
549 - Database does not exist\n\
550 - Connection authentication failed\n\
551 - User lacks privileges to read table data\n\
552 - Network connectivity issues\n\
553 - Insufficient disk space for dump directory\n\
554 - Output directory already exists (pg_dump requires non-existent path)\n\
555 - Connection timeout or network issues",
556 database
557 )
558 })?;
559
560 tracing::info!(
561 "✓ Data dumped successfully using {} parallel jobs",
562 num_cpus
563 );
564 Ok(())
565}
566
567fn get_schema_excluded_tables_for_db(
572 filter: &ReplicationFilter,
573 db_name: &str,
574) -> Option<Vec<String>> {
575 let mut tables = BTreeSet::new();
576
577 if let Some(explicit) = filter.exclude_tables() {
580 for full_name in explicit {
581 let parts: Vec<&str> = full_name.split('.').collect();
582 if parts.len() == 2 && parts[0] == db_name {
583 tables.insert(format!("\"public\".\"{}\"", parts[1]));
585 }
586 }
587 }
588
589 if tables.is_empty() {
590 None
591 } else {
592 Some(tables.into_iter().collect())
593 }
594}
595
596fn get_data_excluded_tables_for_db(
601 filter: &ReplicationFilter,
602 db_name: &str,
603) -> Option<Vec<String>> {
604 let mut tables = BTreeSet::new();
605
606 if let Some(explicit) = filter.exclude_tables() {
609 for full_name in explicit {
610 let parts: Vec<&str> = full_name.split('.').collect();
611 if parts.len() == 2 && parts[0] == db_name {
612 tables.insert(format!("\"public\".\"{}\"", parts[1]));
614 }
615 }
616 }
617
618 for table in filter.schema_only_tables(db_name) {
620 tables.insert(table);
621 }
622
623 for (table, _) in filter.predicate_tables(db_name) {
624 tables.insert(table);
625 }
626
627 if tables.is_empty() {
628 None
629 } else {
630 Some(tables.into_iter().collect())
631 }
632}
633
634fn get_included_tables_for_db(filter: &ReplicationFilter, db_name: &str) -> Option<Vec<String>> {
637 filter.include_tables().map(|tables| {
638 tables
639 .iter()
640 .filter_map(|full_name| {
641 let parts: Vec<&str> = full_name.split('.').collect();
642 if parts.len() == 2 && parts[0] == db_name {
643 Some(format!("\"public\".\"{}\"", parts[1]))
645 } else {
646 None
647 }
648 })
649 .collect()
650 })
651}
652
653#[cfg(test)]
654mod tests {
655 use super::*;
656 use tempfile::tempdir;
657
658 #[tokio::test]
659 #[ignore]
660 async fn test_dump_globals() {
661 let url = std::env::var("TEST_SOURCE_URL").unwrap();
662 let dir = tempdir().unwrap();
663 let output = dir.path().join("globals.sql");
664
665 let result = dump_globals(&url, output.to_str().unwrap()).await;
666
667 assert!(result.is_ok());
668 assert!(output.exists());
669
670 let content = std::fs::read_to_string(&output).unwrap();
672 assert!(content.contains("CREATE ROLE") || !content.is_empty());
673 }
674
675 #[tokio::test]
676 #[ignore]
677 async fn test_dump_schema() {
678 let url = std::env::var("TEST_SOURCE_URL").unwrap();
679 let dir = tempdir().unwrap();
680 let output = dir.path().join("schema.sql");
681
682 let db = url.split('/').next_back().unwrap_or("postgres");
684
685 let filter = crate::filters::ReplicationFilter::empty();
686 let result = dump_schema(&url, db, output.to_str().unwrap(), &filter).await;
687
688 assert!(result.is_ok());
689 assert!(output.exists());
690 }
691
692 #[test]
693 fn test_get_schema_excluded_tables_for_db() {
694 let filter = crate::filters::ReplicationFilter::new(
695 None,
696 None,
697 None,
698 Some(vec![
699 "db1.table1".to_string(),
700 "db1.table2".to_string(),
701 "db2.table3".to_string(),
702 ]),
703 )
704 .unwrap();
705
706 let tables = get_schema_excluded_tables_for_db(&filter, "db1").unwrap();
708 assert_eq!(
710 tables,
711 vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
712 );
713
714 let tables = get_schema_excluded_tables_for_db(&filter, "db2").unwrap();
715 assert_eq!(tables, vec!["\"public\".\"table3\""]);
716
717 let tables = get_schema_excluded_tables_for_db(&filter, "db3");
718 assert!(tables.is_none() || tables.unwrap().is_empty());
719 }
720
721 #[test]
722 fn test_get_data_excluded_tables_for_db() {
723 let filter = crate::filters::ReplicationFilter::new(
724 None,
725 None,
726 None,
727 Some(vec![
728 "db1.table1".to_string(),
729 "db1.table2".to_string(),
730 "db2.table3".to_string(),
731 ]),
732 )
733 .unwrap();
734
735 let tables = get_data_excluded_tables_for_db(&filter, "db1").unwrap();
737 assert_eq!(
739 tables,
740 vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
741 );
742
743 let tables = get_data_excluded_tables_for_db(&filter, "db2").unwrap();
744 assert_eq!(tables, vec!["\"public\".\"table3\""]);
745
746 let tables = get_data_excluded_tables_for_db(&filter, "db3");
747 assert!(tables.is_none() || tables.unwrap().is_empty());
748 }
749
750 #[test]
751 fn test_get_included_tables_for_db() {
752 let filter = crate::filters::ReplicationFilter::new(
753 None,
754 None,
755 Some(vec![
756 "db1.users".to_string(),
757 "db1.orders".to_string(),
758 "db2.products".to_string(),
759 ]),
760 None,
761 )
762 .unwrap();
763
764 let tables = get_included_tables_for_db(&filter, "db1").unwrap();
765 assert_eq!(
767 tables,
768 vec!["\"public\".\"users\"", "\"public\".\"orders\""]
769 );
770
771 let tables = get_included_tables_for_db(&filter, "db2").unwrap();
772 assert_eq!(tables, vec!["\"public\".\"products\""]);
773
774 let tables = get_included_tables_for_db(&filter, "db3");
775 assert!(tables.is_none() || tables.unwrap().is_empty());
776 }
777
778 #[test]
779 fn test_get_schema_excluded_tables_for_db_with_empty_filter() {
780 let filter = crate::filters::ReplicationFilter::empty();
781 let tables = get_schema_excluded_tables_for_db(&filter, "db1");
782 assert!(tables.is_none());
783 }
784
785 #[test]
786 fn test_get_data_excluded_tables_for_db_with_empty_filter() {
787 let filter = crate::filters::ReplicationFilter::empty();
788 let tables = get_data_excluded_tables_for_db(&filter, "db1");
789 assert!(tables.is_none());
790 }
791
792 #[test]
793 fn test_get_included_tables_for_db_with_empty_filter() {
794 let filter = crate::filters::ReplicationFilter::empty();
795 let tables = get_included_tables_for_db(&filter, "db1");
796 assert!(tables.is_none());
797 }
798
799 #[test]
800 fn test_rewrite_create_role_statements_wraps_unquoted_role() {
801 let sql = "CREATE ROLE replicator WITH LOGIN;\nALTER ROLE replicator WITH LOGIN;\n";
802 let rewritten = rewrite_create_role_statements(sql).expect("rewrite expected");
803
804 assert!(rewritten.contains("DO $$"));
805 assert!(rewritten.contains("Role replicator already exists"));
806 assert!(rewritten.contains("CREATE ROLE replicator WITH LOGIN;"));
807 assert!(rewritten.contains("ALTER ROLE replicator WITH LOGIN;"));
808 }
809
810 #[test]
811 fn test_rewrite_create_role_statements_wraps_quoted_role() {
812 let sql = " CREATE ROLE \"Andre Admin\";\n";
813 let rewritten = rewrite_create_role_statements(sql).expect("rewrite expected");
814
815 assert!(rewritten.contains("DO $$"));
816 assert!(rewritten.contains("Andre Admin already exists"));
817 assert!(rewritten.contains("CREATE ROLE \"Andre Admin\""));
818 assert!(rewritten.starts_with(" DO $$"));
819 }
820
821 #[test]
822 fn test_rewrite_create_role_statements_noop_when_absent() {
823 let sql = "ALTER ROLE existing WITH LOGIN;\n";
824 assert!(rewrite_create_role_statements(sql).is_none());
825 }
826}