database_replicator/migration/
restore.rs1use anyhow::{Context, Result};
5use std::process::{Command, Stdio};
6use std::time::Duration;
7use tokio::process::Command as TokioCommand;
8
9pub async fn restore_globals(target_url: &str, input_path: &str) -> Result<()> {
11 tracing::info!("Restoring global objects from {}", input_path);
12
13 let parts = crate::utils::parse_postgres_url(target_url)
15 .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
16 let pgpass = crate::utils::PgPassFile::new(&parts)
17 .context("Failed to create .pgpass file for authentication")?;
18
19 let env_vars = parts.to_pg_env_vars();
20
21 let mut cmd = TokioCommand::new("psql");
22 cmd.arg("--host")
23 .arg(&parts.host)
24 .arg("--port")
25 .arg(parts.port.to_string())
26 .arg("--dbname")
27 .arg(&parts.database)
28 .arg(format!("--file={}", input_path))
29 .arg("--quiet")
30 .arg("-v")
31 .arg("ON_ERROR_STOP=1") .env("PGPASSFILE", pgpass.path())
33 .stdout(Stdio::piped())
34 .stderr(Stdio::piped());
35
36 if let Some(user) = &parts.user {
38 cmd.arg("--username").arg(user);
39 }
40
41 for (env_var, value) in &env_vars {
43 cmd.env(env_var, value);
44 }
45
46 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
48 cmd.env(env_var, value);
49 }
50
51 cmd.env("PGCONNECT_TIMEOUT", "30");
53
54 let output = cmd.output().await.context(
55 "Failed to execute psql. Is PostgreSQL client installed?\n\
56 Install with:\n\
57 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
58 - macOS: brew install postgresql\n\
59 - RHEL/CentOS: sudo yum install postgresql",
60 )?;
61
62 if output.status.success() {
63 tracing::info!("✓ Global objects restored");
64 let stderr = String::from_utf8_lossy(&output.stderr);
66 if !stderr.trim().is_empty() {
67 tracing::debug!("psql stderr output:\n{}", stderr);
68 }
69 Ok(())
70 } else {
71 let stderr = String::from_utf8_lossy(&output.stderr);
72 let stdout = String::from_utf8_lossy(&output.stdout);
73
74 if stderr.contains("already exists") && stderr.contains("skipping") {
76 tracing::warn!(
77 "⚠ Some global objects already exist, which is usually safe. Full output:\n{}",
78 stderr
79 );
80 Ok(()) } else {
82 anyhow::bail!(
84 "Failed to restore global objects.\n\
85 Exit Code: {}\n\
86 Stderr:\n{}\n\
87 Stdout:\n{}",
88 output.status,
89 stderr,
90 stdout
91 );
92 }
93 }
94}
95
96pub async fn restore_schema(target_url: &str, input_path: &str) -> Result<()> {
98 tracing::info!("Restoring schema from {}", input_path);
99
100 let parts = crate::utils::parse_postgres_url(target_url)
102 .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
103 let pgpass = crate::utils::PgPassFile::new(&parts)
104 .context("Failed to create .pgpass file for authentication")?;
105
106 let env_vars = parts.to_pg_env_vars();
107 let input_path_owned = input_path.to_string();
108
109 crate::utils::retry_subprocess_with_backoff(
111 || {
112 let mut cmd = Command::new("psql");
113 cmd.arg("--host")
114 .arg(&parts.host)
115 .arg("--port")
116 .arg(parts.port.to_string())
117 .arg("--dbname")
118 .arg(&parts.database)
119 .arg(format!("--file={}", input_path_owned))
120 .arg("--quiet")
121 .arg("-v")
122 .arg("ON_ERROR_STOP=1") .env("PGPASSFILE", pgpass.path())
124 .stdout(Stdio::inherit())
125 .stderr(Stdio::inherit());
126
127 if let Some(user) = &parts.user {
129 cmd.arg("--username").arg(user);
130 }
131
132 for (env_var, value) in &env_vars {
134 cmd.env(env_var, value);
135 }
136
137 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
139 cmd.env(env_var, value);
140 }
141
142 cmd.env("PGCONNECT_TIMEOUT", "30");
144
145 cmd.status().context(
146 "Failed to execute psql. Is PostgreSQL client installed?\n\
147 Install with:\n\
148 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
149 - macOS: brew install postgresql\n\
150 - RHEL/CentOS: sudo yum install postgresql",
151 )
152 },
153 3, Duration::from_secs(1), "psql (restore schema)",
156 )
157 .await
158 .context(
159 "Schema restoration failed.\n\
160 \n\
161 Common causes:\n\
162 - Target database does not exist\n\
163 - User lacks CREATE privileges on target\n\
164 - Schema objects already exist (try dropping them first)\n\
165 - Version incompatibility between source and target\n\
166 - Syntax errors in dump file\n\
167 - Connection timeout or network issues",
168 )?;
169
170 tracing::info!("✓ Schema restored successfully");
171 Ok(())
172}
173
174pub async fn restore_data(target_url: &str, input_path: &str) -> Result<()> {
200 tracing::info!("Restoring data from {} (format=directory)", input_path);
201
202 let parts = crate::utils::parse_postgres_url(target_url)
204 .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
205 let pgpass = crate::utils::PgPassFile::new(&parts)
206 .context("Failed to create .pgpass file for authentication")?;
207
208 let env_vars = parts.to_pg_env_vars();
209
210 let mut cmd = Command::new("pg_restore");
218 cmd.arg("--data-only")
219 .arg("--no-owner")
220 .arg("--host")
221 .arg(&parts.host)
222 .arg("--port")
223 .arg(parts.port.to_string())
224 .arg("--dbname")
225 .arg(&parts.database)
226 .arg("--format=directory") .arg("--verbose") .arg(input_path)
229 .env("PGPASSFILE", pgpass.path())
230 .stdout(Stdio::inherit())
231 .stderr(Stdio::inherit());
232
233 if let Some(user) = &parts.user {
235 cmd.arg("--username").arg(user);
236 }
237
238 for (env_var, value) in &env_vars {
240 cmd.env(env_var, value);
241 }
242
243 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
245 cmd.env(env_var, value);
246 }
247
248 cmd.env("PGCONNECT_TIMEOUT", "30");
250
251 let status = cmd.status().context(
252 "Failed to execute pg_restore. Is PostgreSQL client installed?\n\
253 Install with:\n\
254 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
255 - macOS: brew install postgresql\n\
256 - RHEL/CentOS: sudo yum install postgresql",
257 )?;
258
259 if !status.success() {
260 anyhow::bail!(
261 "Data restoration failed (exit code: {}).\n\
262 \n\
263 Common causes:\n\
264 - Foreign key constraint violations\n\
265 - Unique constraint violations (data already exists from a previous partial restore)\n\
266 - User lacks INSERT privileges on target tables\n\
267 - Disk space issues on target\n\
268 - Data type mismatches\n\
269 - Input directory is not a valid pg_dump directory format\n\
270 - Connection timeout or network issues\n\
271 \n\
272 If you see 'duplicate key' errors, re-run with --drop-existing to ensure a clean database.",
273 status.code().unwrap_or(-1)
274 );
275 }
276
277 tracing::info!("✓ Data restored successfully");
278 Ok(())
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284 use crate::migration::dump;
285 use tempfile::tempdir;
286
287 #[tokio::test]
288 #[ignore]
289 async fn test_restore_globals() {
290 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
291 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
292
293 let dir = tempdir().unwrap();
294 let dump_file = dir.path().join("globals.sql");
295
296 dump::dump_globals(&source_url, dump_file.to_str().unwrap())
298 .await
299 .unwrap();
300
301 let result = restore_globals(&target_url, dump_file.to_str().unwrap()).await;
303 assert!(result.is_ok());
304 }
305}