database_replicator/migration/
restore.rs1use anyhow::{Context, Result};
5use std::process::{Command, Stdio};
6use std::time::Duration;
7use tokio::process::Command as TokioCommand;
8
9pub async fn restore_globals(target_url: &str, input_path: &str) -> Result<()> {
11 tracing::info!("Restoring global objects from {}", input_path);
12
13 let parts = crate::utils::parse_postgres_url(target_url)
15 .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
16 let pgpass = crate::utils::PgPassFile::new(&parts)
17 .context("Failed to create .pgpass file for authentication")?;
18
19 let env_vars = parts.to_pg_env_vars();
20
21 let mut cmd = TokioCommand::new("psql");
22 cmd.arg("--host")
23 .arg(&parts.host)
24 .arg("--port")
25 .arg(parts.port.to_string())
26 .arg("--dbname")
27 .arg(&parts.database)
28 .arg(format!("--file={}", input_path))
29 .arg("--quiet")
30 .arg("-v")
31 .arg("ON_ERROR_STOP=1") .env("PGPASSFILE", pgpass.path())
33 .stdout(Stdio::piped())
34 .stderr(Stdio::piped());
35
36 if let Some(user) = &parts.user {
38 cmd.arg("--username").arg(user);
39 }
40
41 for (env_var, value) in &env_vars {
43 cmd.env(env_var, value);
44 }
45
46 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
48 cmd.env(env_var, value);
49 }
50
51 cmd.env("PGCONNECT_TIMEOUT", "30");
53
54 let output = cmd.output().await.context(
55 "Failed to execute psql. Is PostgreSQL client installed?\n\
56 Install with:\n\
57 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
58 - macOS: brew install postgresql\n\
59 - RHEL/CentOS: sudo yum install postgresql",
60 )?;
61
62 if output.status.success() {
63 tracing::info!("✓ Global objects restored");
64 let stderr = String::from_utf8_lossy(&output.stderr);
66 if !stderr.trim().is_empty() {
67 tracing::debug!("psql stderr output:\n{}", stderr);
68 }
69 Ok(())
70 } else {
71 let stderr = String::from_utf8_lossy(&output.stderr);
72 let stdout = String::from_utf8_lossy(&output.stdout);
73
74 if stderr.contains("already exists") && stderr.contains("skipping") {
76 tracing::warn!(
77 "⚠ Some global objects already exist, which is usually safe. Full output:\n{}",
78 stderr
79 );
80 Ok(()) } else {
82 anyhow::bail!(
84 "Failed to restore global objects.\n\
85 Exit Code: {}\n\
86 Stderr:\n{}\n\
87 Stdout:\n{}",
88 output.status,
89 stderr,
90 stdout
91 );
92 }
93 }
94}
95
96pub async fn restore_schema(target_url: &str, input_path: &str) -> Result<()> {
98 tracing::info!("Restoring schema from {}", input_path);
99
100 let parts = crate::utils::parse_postgres_url(target_url)
102 .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
103 let pgpass = crate::utils::PgPassFile::new(&parts)
104 .context("Failed to create .pgpass file for authentication")?;
105
106 let env_vars = parts.to_pg_env_vars();
107 let input_path_owned = input_path.to_string();
108
109 crate::utils::retry_subprocess_with_backoff(
111 || {
112 let mut cmd = Command::new("psql");
113 cmd.arg("--host")
114 .arg(&parts.host)
115 .arg("--port")
116 .arg(parts.port.to_string())
117 .arg("--dbname")
118 .arg(&parts.database)
119 .arg(format!("--file={}", input_path_owned))
120 .arg("--quiet")
121 .arg("-v")
122 .arg("ON_ERROR_STOP=1") .env("PGPASSFILE", pgpass.path())
124 .stdout(Stdio::inherit())
125 .stderr(Stdio::inherit());
126
127 if let Some(user) = &parts.user {
129 cmd.arg("--username").arg(user);
130 }
131
132 for (env_var, value) in &env_vars {
134 cmd.env(env_var, value);
135 }
136
137 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
139 cmd.env(env_var, value);
140 }
141
142 cmd.env("PGCONNECT_TIMEOUT", "30");
144
145 cmd.status().context(
146 "Failed to execute psql. Is PostgreSQL client installed?\n\
147 Install with:\n\
148 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
149 - macOS: brew install postgresql\n\
150 - RHEL/CentOS: sudo yum install postgresql",
151 )
152 },
153 3, Duration::from_secs(1), "psql (restore schema)",
156 )
157 .await
158 .context(
159 "Schema restoration failed.\n\
160 \n\
161 Common causes:\n\
162 - Target database does not exist\n\
163 - User lacks CREATE privileges on target\n\
164 - Schema objects already exist (try dropping them first)\n\
165 - Version incompatibility between source and target\n\
166 - Syntax errors in dump file\n\
167 - Connection timeout or network issues",
168 )?;
169
170 tracing::info!("✓ Schema restored successfully");
171 Ok(())
172}
173
174pub async fn restore_data(target_url: &str, input_path: &str) -> Result<()> {
192 let num_cpus = std::thread::available_parallelism()
194 .map(|n| n.get().min(8))
195 .unwrap_or(4);
196
197 tracing::info!(
198 "Restoring data from {} (parallel={}, format=directory)",
199 input_path,
200 num_cpus
201 );
202
203 let parts = crate::utils::parse_postgres_url(target_url)
205 .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
206 let pgpass = crate::utils::PgPassFile::new(&parts)
207 .context("Failed to create .pgpass file for authentication")?;
208
209 let env_vars = parts.to_pg_env_vars();
210
211 let mut cmd = Command::new("pg_restore");
219 cmd.arg("--data-only")
220 .arg("--no-owner")
221 .arg("--disable-triggers") .arg(format!("--jobs={}", num_cpus)) .arg("--host")
224 .arg(&parts.host)
225 .arg("--port")
226 .arg(parts.port.to_string())
227 .arg("--dbname")
228 .arg(&parts.database)
229 .arg("--format=directory") .arg("--verbose") .arg(input_path)
232 .env("PGPASSFILE", pgpass.path())
233 .stdout(Stdio::inherit())
234 .stderr(Stdio::inherit());
235
236 if let Some(user) = &parts.user {
238 cmd.arg("--username").arg(user);
239 }
240
241 for (env_var, value) in &env_vars {
243 cmd.env(env_var, value);
244 }
245
246 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
248 cmd.env(env_var, value);
249 }
250
251 cmd.env("PGCONNECT_TIMEOUT", "30");
253
254 let status = cmd.status().context(
255 "Failed to execute pg_restore. Is PostgreSQL client installed?\n\
256 Install with:\n\
257 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
258 - macOS: brew install postgresql\n\
259 - RHEL/CentOS: sudo yum install postgresql",
260 )?;
261
262 if !status.success() {
263 anyhow::bail!(
264 "Data restoration failed (exit code: {}).\n\
265 \n\
266 Common causes:\n\
267 - Foreign key constraint violations\n\
268 - Unique constraint violations (data already exists from a previous partial restore)\n\
269 - User lacks INSERT privileges on target tables\n\
270 - Disk space issues on target\n\
271 - Data type mismatches\n\
272 - Input directory is not a valid pg_dump directory format\n\
273 - Connection timeout or network issues\n\
274 \n\
275 If you see 'duplicate key' errors, re-run with --drop-existing to ensure a clean database.",
276 status.code().unwrap_or(-1)
277 );
278 }
279
280 tracing::info!(
281 "✓ Data restored successfully using {} parallel jobs",
282 num_cpus
283 );
284 Ok(())
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use crate::migration::dump;
291 use tempfile::tempdir;
292
293 #[tokio::test]
294 #[ignore]
295 async fn test_restore_globals() {
296 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
297 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
298
299 let dir = tempdir().unwrap();
300 let dump_file = dir.path().join("globals.sql");
301
302 dump::dump_globals(&source_url, dump_file.to_str().unwrap())
304 .await
305 .unwrap();
306
307 let result = restore_globals(&target_url, dump_file.to_str().unwrap()).await;
309 assert!(result.is_ok());
310 }
311}