database_replicator/migration/
dump.rs1use crate::filters::ReplicationFilter;
5use anyhow::{Context, Result};
6use std::collections::BTreeSet;
7use std::fs;
8use std::process::{Command, Stdio};
9use std::time::Duration;
10
11pub async fn dump_globals(source_url: &str, output_path: &str) -> Result<()> {
13 tracing::info!("Dumping global objects to {}", output_path);
14
15 let parts = crate::utils::parse_postgres_url(source_url)
17 .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
18 let pgpass = crate::utils::PgPassFile::new(&parts)
19 .context("Failed to create .pgpass file for authentication")?;
20
21 let env_vars = parts.to_pg_env_vars();
22 let output_path_owned = output_path.to_string();
23
24 crate::utils::retry_subprocess_with_backoff(
26 || {
27 let mut cmd = Command::new("pg_dumpall");
28 cmd.arg("--globals-only")
29 .arg("--no-role-passwords") .arg("--verbose") .arg("--host")
32 .arg(&parts.host)
33 .arg("--port")
34 .arg(parts.port.to_string())
35 .arg("--database")
36 .arg(&parts.database)
37 .arg(format!("--file={}", output_path_owned))
38 .env("PGPASSFILE", pgpass.path())
39 .stdout(Stdio::inherit())
40 .stderr(Stdio::inherit());
41
42 if let Some(user) = &parts.user {
44 cmd.arg("--username").arg(user);
45 }
46
47 for (env_var, value) in &env_vars {
49 cmd.env(env_var, value);
50 }
51
52 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
54 cmd.env(env_var, value);
55 }
56
57 cmd.status().context(
58 "Failed to execute pg_dumpall. Is PostgreSQL client installed?\n\
59 Install with:\n\
60 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
61 - macOS: brew install postgresql\n\
62 - RHEL/CentOS: sudo yum install postgresql",
63 )
64 },
65 3, Duration::from_secs(1), "pg_dumpall (dump globals)",
68 )
69 .await
70 .context(
71 "pg_dumpall failed to dump global objects.\n\
72 \n\
73 Common causes:\n\
74 - Connection authentication failed\n\
75 - User lacks sufficient privileges (need SUPERUSER or pg_read_all_settings role)\n\
76 - Network connectivity issues\n\
77 - Invalid connection string\n\
78 - Connection timeout or network issues",
79 )?;
80
81 tracing::info!("✓ Global objects dumped successfully");
82 Ok(())
83}
84
85pub fn sanitize_globals_dump(path: &str) -> Result<()> {
96 let content = fs::read_to_string(path)
97 .with_context(|| format!("Failed to read globals dump at {}", path))?;
98
99 if let Some(updated) = rewrite_create_role_statements(&content) {
100 fs::write(path, updated)
101 .with_context(|| format!("Failed to update globals dump at {}", path))?;
102 }
103
104 Ok(())
105}
106
107pub fn remove_superuser_from_globals(path: &str) -> Result<()> {
113 let content = fs::read_to_string(path)
114 .with_context(|| format!("Failed to read globals dump at {}", path))?;
115
116 let mut updated = String::with_capacity(content.len());
117 let mut modified = false;
118 for line in content.lines() {
119 if line.contains("ALTER ROLE") && line.contains("SUPERUSER") {
120 updated.push_str("-- ");
121 updated.push_str(line);
122 updated.push('\n');
123 modified = true;
124 } else {
125 updated.push_str(line);
126 updated.push('\n');
127 }
128 }
129
130 if modified {
131 fs::write(path, updated)
132 .with_context(|| format!("Failed to write sanitized globals dump to {}", path))?;
133 }
134
135 Ok(())
136}
137
138pub fn remove_restricted_guc_settings(path: &str) -> Result<()> {
144 let content = fs::read_to_string(path)
145 .with_context(|| format!("Failed to read globals dump at {}", path))?;
146
147 let mut updated = String::with_capacity(content.len());
148 let mut modified = false;
149
150 for line in content.lines() {
151 let lower_line = line.to_ascii_lowercase();
152 if lower_line.contains("alter role") && lower_line.contains("set") {
153 updated.push_str("-- ");
154 updated.push_str(line);
155 updated.push('\n');
156 modified = true;
157 } else {
158 updated.push_str(line);
159 updated.push('\n');
160 }
161 }
162
163 if modified {
164 fs::write(path, updated)
165 .with_context(|| format!("Failed to write sanitized globals dump to {}", path))?;
166 }
167
168 Ok(())
169}
170
171fn rewrite_create_role_statements(sql: &str) -> Option<String> {
172 if sql.is_empty() {
173 return None;
174 }
175
176 let mut output = String::with_capacity(sql.len() + 1024);
177 let mut modified = false;
178 let mut cursor = 0;
179
180 while cursor < sql.len() {
181 if let Some(rel_pos) = sql[cursor..].find('\n') {
182 let end = cursor + rel_pos + 1;
183 let chunk = &sql[cursor..end];
184 if let Some(transformed) = wrap_create_role_line(chunk) {
185 output.push_str(&transformed);
186 modified = true;
187 } else {
188 output.push_str(chunk);
189 }
190 cursor = end;
191 } else {
192 let chunk = &sql[cursor..];
193 if let Some(transformed) = wrap_create_role_line(chunk) {
194 output.push_str(&transformed);
195 modified = true;
196 } else {
197 output.push_str(chunk);
198 }
199 break;
200 }
201 }
202
203 if modified {
204 Some(output)
205 } else {
206 None
207 }
208}
209
210fn wrap_create_role_line(chunk: &str) -> Option<String> {
211 let trimmed = chunk.trim_start();
212 if !trimmed.starts_with("CREATE ROLE ") {
213 return None;
214 }
215
216 let statement = trimmed.trim_end();
217 let statement_body = statement.trim_end_matches(';').trim_end();
218 let leading_ws_len = chunk.len() - trimmed.len();
219 let leading_ws = &chunk[..leading_ws_len];
220 let newline = if chunk.ends_with("\r\n") {
221 "\r\n"
222 } else if chunk.ends_with('\n') {
223 "\n"
224 } else {
225 ""
226 };
227
228 let role_token = extract_role_token(statement_body)?;
229
230 let notice_name = escape_single_quotes(&unquote_role_name(&role_token));
231
232 let mut block = String::with_capacity(chunk.len() + 128);
233 block.push_str(leading_ws);
234 block.push_str("DO $$\n");
235 block.push_str(leading_ws);
236 block.push_str("BEGIN\n");
237 block.push_str(leading_ws);
238 block.push_str(" ");
239 block.push_str(statement_body);
240 block.push_str(";\n");
241 block.push_str(leading_ws);
242 block.push_str("EXCEPTION\n");
243 block.push_str(leading_ws);
244 block.push_str(" WHEN duplicate_object THEN\n");
245 block.push_str(leading_ws);
246 block.push_str(" RAISE NOTICE 'Role ");
247 block.push_str(¬ice_name);
248 block.push_str(" already exists on target, skipping CREATE ROLE';\n");
249 block.push_str(leading_ws);
250 block.push_str("END $$;");
251
252 if !newline.is_empty() {
253 block.push_str(newline);
254 }
255
256 Some(block)
257}
258
259fn extract_role_token(statement: &str) -> Option<String> {
260 let remainder = statement.strip_prefix("CREATE ROLE")?.trim_start();
261
262 if remainder.starts_with('"') {
263 let mut idx = 1;
264 let bytes = remainder.as_bytes();
265 while idx < bytes.len() {
266 if bytes[idx] == b'"' {
267 if idx + 1 < bytes.len() && bytes[idx + 1] == b'"' {
268 idx += 2;
269 continue;
270 } else {
271 idx += 1;
272 break;
273 }
274 }
275 idx += 1;
276 }
277 if idx <= remainder.len() {
278 return Some(remainder[..idx].to_string());
279 }
280 None
281 } else {
282 let mut end = remainder.len();
283 for (i, ch) in remainder.char_indices() {
284 if ch.is_whitespace() || ch == ';' {
285 end = i;
286 break;
287 }
288 }
289 if end == 0 {
290 None
291 } else {
292 Some(remainder[..end].to_string())
293 }
294 }
295}
296
297fn unquote_role_name(token: &str) -> String {
298 if token.starts_with('"') && token.ends_with('"') && token.len() >= 2 {
299 let inner = &token[1..token.len() - 1];
300 inner.replace("\"\"", "\"")
301 } else {
302 token.to_string()
303 }
304}
305
306fn escape_single_quotes(value: &str) -> String {
307 value.replace('\'', "''")
308}
309
310pub async fn dump_schema(
312 source_url: &str,
313 database: &str,
314 output_path: &str,
315 filter: &ReplicationFilter,
316) -> Result<()> {
317 tracing::info!(
318 "Dumping schema for database '{}' to {}",
319 database,
320 output_path
321 );
322
323 let parts = crate::utils::parse_postgres_url(source_url)
325 .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
326 let pgpass = crate::utils::PgPassFile::new(&parts)
327 .context("Failed to create .pgpass file for authentication")?;
328
329 let env_vars = parts.to_pg_env_vars();
330 let output_path_owned = output_path.to_string();
331
332 let exclude_tables = get_schema_excluded_tables_for_db(filter, database);
334 let include_tables = get_included_tables_for_db(filter, database);
335
336 crate::utils::retry_subprocess_with_backoff(
338 || {
339 let mut cmd = Command::new("pg_dump");
340 cmd.arg("--schema-only")
341 .arg("--no-owner") .arg("--no-privileges") .arg("--verbose"); if let Some(ref exclude) = exclude_tables {
348 if !exclude.is_empty() {
349 for table in exclude {
350 cmd.arg("--exclude-table").arg(table);
351 }
352 }
353 }
354
355 if let Some(ref include) = include_tables {
357 if !include.is_empty() {
358 for table in include {
359 cmd.arg("--table").arg(table);
360 }
361 }
362 }
363
364 cmd.arg("--host")
365 .arg(&parts.host)
366 .arg("--port")
367 .arg(parts.port.to_string())
368 .arg("--dbname")
369 .arg(&parts.database)
370 .arg(format!("--file={}", output_path_owned))
371 .env("PGPASSFILE", pgpass.path())
372 .stdout(Stdio::inherit())
373 .stderr(Stdio::inherit());
374
375 if let Some(user) = &parts.user {
377 cmd.arg("--username").arg(user);
378 }
379
380 for (env_var, value) in &env_vars {
382 cmd.env(env_var, value);
383 }
384
385 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
387 cmd.env(env_var, value);
388 }
389
390 cmd.status().context(
391 "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
392 Install with:\n\
393 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
394 - macOS: brew install postgresql\n\
395 - RHEL/CentOS: sudo yum install postgresql",
396 )
397 },
398 3, Duration::from_secs(1), "pg_dump (dump schema)",
401 )
402 .await
403 .with_context(|| {
404 format!(
405 "pg_dump failed to dump schema for database '{}'.\n\
406 \n\
407 Common causes:\n\
408 - Database does not exist\n\
409 - Connection authentication failed\n\
410 - User lacks privileges to read database schema\n\
411 - Network connectivity issues\n\
412 - Connection timeout or network issues",
413 database
414 )
415 })?;
416
417 tracing::info!("✓ Schema dumped successfully");
418 Ok(())
419}
420
421pub async fn dump_data(
431 source_url: &str,
432 database: &str,
433 output_path: &str,
434 filter: &ReplicationFilter,
435) -> Result<()> {
436 let num_cpus = std::thread::available_parallelism()
438 .map(|n| n.get().min(8))
439 .unwrap_or(4);
440
441 tracing::info!(
442 "Dumping data for database '{}' to {} (parallel={}, compression=9, format=directory)",
443 database,
444 output_path,
445 num_cpus
446 );
447
448 let parts = crate::utils::parse_postgres_url(source_url)
450 .with_context(|| format!("Failed to parse source URL: {}", source_url))?;
451 let pgpass = crate::utils::PgPassFile::new(&parts)
452 .context("Failed to create .pgpass file for authentication")?;
453
454 let env_vars = parts.to_pg_env_vars();
455 let output_path_owned = output_path.to_string();
456
457 let exclude_tables = get_data_excluded_tables_for_db(filter, database);
459 let include_tables = get_included_tables_for_db(filter, database);
460
461 crate::utils::retry_subprocess_with_backoff(
463 || {
464 let mut cmd = Command::new("pg_dump");
465 cmd.arg("--data-only")
466 .arg("--no-owner")
467 .arg("--format=directory") .arg("--blobs") .arg("--compress=9") .arg(format!("--jobs={}", num_cpus)) .arg("--verbose"); if let Some(ref exclude) = exclude_tables {
476 if !exclude.is_empty() {
477 for table in exclude {
478 cmd.arg("--exclude-table-data").arg(table);
479 }
480 }
481 }
482
483 if let Some(ref include) = include_tables {
485 if !include.is_empty() {
486 for table in include {
487 cmd.arg("--table").arg(table);
488 }
489 }
490 }
491
492 cmd.arg("--host")
493 .arg(&parts.host)
494 .arg("--port")
495 .arg(parts.port.to_string())
496 .arg("--dbname")
497 .arg(&parts.database)
498 .arg(format!("--file={}", output_path_owned))
499 .env("PGPASSFILE", pgpass.path())
500 .stdout(Stdio::inherit())
501 .stderr(Stdio::inherit());
502
503 if let Some(user) = &parts.user {
505 cmd.arg("--username").arg(user);
506 }
507
508 for (env_var, value) in &env_vars {
510 cmd.env(env_var, value);
511 }
512
513 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
515 cmd.env(env_var, value);
516 }
517
518 cmd.status().context(
519 "Failed to execute pg_dump. Is PostgreSQL client installed?\n\
520 Install with:\n\
521 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
522 - macOS: brew install postgresql\n\
523 - RHEL/CentOS: sudo yum install postgresql",
524 )
525 },
526 3, Duration::from_secs(1), "pg_dump (dump data)",
529 )
530 .await
531 .with_context(|| {
532 format!(
533 "pg_dump failed to dump data for database '{}'.\n\
534 \n\
535 Common causes:\n\
536 - Database does not exist\n\
537 - Connection authentication failed\n\
538 - User lacks privileges to read table data\n\
539 - Network connectivity issues\n\
540 - Insufficient disk space for dump directory\n\
541 - Output directory already exists (pg_dump requires non-existent path)\n\
542 - Connection timeout or network issues",
543 database
544 )
545 })?;
546
547 tracing::info!(
548 "✓ Data dumped successfully using {} parallel jobs",
549 num_cpus
550 );
551 Ok(())
552}
553
554fn get_schema_excluded_tables_for_db(
559 filter: &ReplicationFilter,
560 db_name: &str,
561) -> Option<Vec<String>> {
562 let mut tables = BTreeSet::new();
563
564 if let Some(explicit) = filter.exclude_tables() {
567 for full_name in explicit {
568 let parts: Vec<&str> = full_name.split('.').collect();
569 if parts.len() == 2 && parts[0] == db_name {
570 tables.insert(format!("\"public\".\"{}\"", parts[1]));
572 }
573 }
574 }
575
576 if tables.is_empty() {
577 None
578 } else {
579 Some(tables.into_iter().collect())
580 }
581}
582
583fn get_data_excluded_tables_for_db(
588 filter: &ReplicationFilter,
589 db_name: &str,
590) -> Option<Vec<String>> {
591 let mut tables = BTreeSet::new();
592
593 if let Some(explicit) = filter.exclude_tables() {
596 for full_name in explicit {
597 let parts: Vec<&str> = full_name.split('.').collect();
598 if parts.len() == 2 && parts[0] == db_name {
599 tables.insert(format!("\"public\".\"{}\"", parts[1]));
601 }
602 }
603 }
604
605 for table in filter.schema_only_tables(db_name) {
607 tables.insert(table);
608 }
609
610 for (table, _) in filter.predicate_tables(db_name) {
611 tables.insert(table);
612 }
613
614 if tables.is_empty() {
615 None
616 } else {
617 Some(tables.into_iter().collect())
618 }
619}
620
621fn get_included_tables_for_db(filter: &ReplicationFilter, db_name: &str) -> Option<Vec<String>> {
624 filter.include_tables().map(|tables| {
625 tables
626 .iter()
627 .filter_map(|full_name| {
628 let parts: Vec<&str> = full_name.split('.').collect();
629 if parts.len() == 2 && parts[0] == db_name {
630 Some(format!("\"public\".\"{}\"", parts[1]))
632 } else {
633 None
634 }
635 })
636 .collect()
637 })
638}
639
640#[cfg(test)]
641mod tests {
642 use super::*;
643 use tempfile::tempdir;
644
645 #[tokio::test]
646 #[ignore]
647 async fn test_dump_globals() {
648 let url = std::env::var("TEST_SOURCE_URL").unwrap();
649 let dir = tempdir().unwrap();
650 let output = dir.path().join("globals.sql");
651
652 let result = dump_globals(&url, output.to_str().unwrap()).await;
653
654 assert!(result.is_ok());
655 assert!(output.exists());
656
657 let content = std::fs::read_to_string(&output).unwrap();
659 assert!(content.contains("CREATE ROLE") || !content.is_empty());
660 }
661
662 #[tokio::test]
663 #[ignore]
664 async fn test_dump_schema() {
665 let url = std::env::var("TEST_SOURCE_URL").unwrap();
666 let dir = tempdir().unwrap();
667 let output = dir.path().join("schema.sql");
668
669 let db = url.split('/').next_back().unwrap_or("postgres");
671
672 let filter = crate::filters::ReplicationFilter::empty();
673 let result = dump_schema(&url, db, output.to_str().unwrap(), &filter).await;
674
675 assert!(result.is_ok());
676 assert!(output.exists());
677 }
678
679 #[test]
680 fn test_get_schema_excluded_tables_for_db() {
681 let filter = crate::filters::ReplicationFilter::new(
682 None,
683 None,
684 None,
685 Some(vec![
686 "db1.table1".to_string(),
687 "db1.table2".to_string(),
688 "db2.table3".to_string(),
689 ]),
690 )
691 .unwrap();
692
693 let tables = get_schema_excluded_tables_for_db(&filter, "db1").unwrap();
695 assert_eq!(
697 tables,
698 vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
699 );
700
701 let tables = get_schema_excluded_tables_for_db(&filter, "db2").unwrap();
702 assert_eq!(tables, vec!["\"public\".\"table3\""]);
703
704 let tables = get_schema_excluded_tables_for_db(&filter, "db3");
705 assert!(tables.is_none() || tables.unwrap().is_empty());
706 }
707
708 #[test]
709 fn test_get_data_excluded_tables_for_db() {
710 let filter = crate::filters::ReplicationFilter::new(
711 None,
712 None,
713 None,
714 Some(vec![
715 "db1.table1".to_string(),
716 "db1.table2".to_string(),
717 "db2.table3".to_string(),
718 ]),
719 )
720 .unwrap();
721
722 let tables = get_data_excluded_tables_for_db(&filter, "db1").unwrap();
724 assert_eq!(
726 tables,
727 vec!["\"public\".\"table1\"", "\"public\".\"table2\""]
728 );
729
730 let tables = get_data_excluded_tables_for_db(&filter, "db2").unwrap();
731 assert_eq!(tables, vec!["\"public\".\"table3\""]);
732
733 let tables = get_data_excluded_tables_for_db(&filter, "db3");
734 assert!(tables.is_none() || tables.unwrap().is_empty());
735 }
736
737 #[test]
738 fn test_get_included_tables_for_db() {
739 let filter = crate::filters::ReplicationFilter::new(
740 None,
741 None,
742 Some(vec![
743 "db1.users".to_string(),
744 "db1.orders".to_string(),
745 "db2.products".to_string(),
746 ]),
747 None,
748 )
749 .unwrap();
750
751 let tables = get_included_tables_for_db(&filter, "db1").unwrap();
752 assert_eq!(
754 tables,
755 vec!["\"public\".\"users\"", "\"public\".\"orders\""]
756 );
757
758 let tables = get_included_tables_for_db(&filter, "db2").unwrap();
759 assert_eq!(tables, vec!["\"public\".\"products\""]);
760
761 let tables = get_included_tables_for_db(&filter, "db3");
762 assert!(tables.is_none() || tables.unwrap().is_empty());
763 }
764
765 #[test]
766 fn test_get_schema_excluded_tables_for_db_with_empty_filter() {
767 let filter = crate::filters::ReplicationFilter::empty();
768 let tables = get_schema_excluded_tables_for_db(&filter, "db1");
769 assert!(tables.is_none());
770 }
771
772 #[test]
773 fn test_get_data_excluded_tables_for_db_with_empty_filter() {
774 let filter = crate::filters::ReplicationFilter::empty();
775 let tables = get_data_excluded_tables_for_db(&filter, "db1");
776 assert!(tables.is_none());
777 }
778
779 #[test]
780 fn test_get_included_tables_for_db_with_empty_filter() {
781 let filter = crate::filters::ReplicationFilter::empty();
782 let tables = get_included_tables_for_db(&filter, "db1");
783 assert!(tables.is_none());
784 }
785
786 #[test]
787 fn test_rewrite_create_role_statements_wraps_unquoted_role() {
788 let sql = "CREATE ROLE replicator WITH LOGIN;\nALTER ROLE replicator WITH LOGIN;\n";
789 let rewritten = rewrite_create_role_statements(sql).expect("rewrite expected");
790
791 assert!(rewritten.contains("DO $$"));
792 assert!(rewritten.contains("Role replicator already exists"));
793 assert!(rewritten.contains("CREATE ROLE replicator WITH LOGIN;"));
794 assert!(rewritten.contains("ALTER ROLE replicator WITH LOGIN;"));
795 }
796
797 #[test]
798 fn test_rewrite_create_role_statements_wraps_quoted_role() {
799 let sql = " CREATE ROLE \"Andre Admin\";\n";
800 let rewritten = rewrite_create_role_statements(sql).expect("rewrite expected");
801
802 assert!(rewritten.contains("DO $$"));
803 assert!(rewritten.contains("Andre Admin already exists"));
804 assert!(rewritten.contains("CREATE ROLE \"Andre Admin\""));
805 assert!(rewritten.starts_with(" DO $$"));
806 }
807
808 #[test]
809 fn test_rewrite_create_role_statements_noop_when_absent() {
810 let sql = "ALTER ROLE existing WITH LOGIN;\n";
811 assert!(rewrite_create_role_statements(sql).is_none());
812 }
813}