database_replicator/migration/
restore.rs1use anyhow::{Context, Result};
5use std::process::{Command, Stdio};
6use std::time::Duration;
7use tokio::process::Command as TokioCommand;
8
9pub async fn restore_globals(target_url: &str, input_path: &str) -> Result<()> {
11 tracing::info!("Restoring global objects from {}", input_path);
12
13 let parts = crate::utils::parse_postgres_url(target_url)
15 .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
16 let pgpass = crate::utils::PgPassFile::new(&parts)
17 .context("Failed to create .pgpass file for authentication")?;
18
19 let env_vars = parts.to_pg_env_vars();
20
21 let mut cmd = TokioCommand::new("psql");
22 cmd.arg("--host")
23 .arg(&parts.host)
24 .arg("--port")
25 .arg(parts.port.to_string())
26 .arg("--dbname")
27 .arg(&parts.database)
28 .arg(format!("--file={}", input_path))
29 .arg("--quiet")
30 .arg("-v")
31 .arg("ON_ERROR_STOP=1") .env("PGPASSFILE", pgpass.path())
33 .stdout(Stdio::piped())
34 .stderr(Stdio::piped());
35
36 if let Some(user) = &parts.user {
38 cmd.arg("--username").arg(user);
39 }
40
41 for (env_var, value) in &env_vars {
43 cmd.env(env_var, value);
44 }
45
46 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
48 cmd.env(env_var, value);
49 }
50
51 cmd.env("PGCONNECT_TIMEOUT", "30");
53
54 let output = cmd.output().await.context(
55 "Failed to execute psql. Is PostgreSQL client installed?\n\
56 Install with:\n\
57 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
58 - macOS: brew install postgresql\n\
59 - RHEL/CentOS: sudo yum install postgresql",
60 )?;
61
62 if output.status.success() {
63 tracing::info!("✓ Global objects restored");
64 let stderr = String::from_utf8_lossy(&output.stderr);
66 if !stderr.trim().is_empty() {
67 tracing::debug!("psql stderr output:\n{}", stderr);
68 }
69 Ok(())
70 } else {
71 let stderr = String::from_utf8_lossy(&output.stderr);
72 let stdout = String::from_utf8_lossy(&output.stdout);
73
74 if stderr.contains("already exists") && stderr.contains("skipping") {
76 tracing::warn!(
77 "⚠ Some global objects already exist, which is usually safe. Full output:\n{}",
78 stderr
79 );
80 Ok(()) } else {
82 anyhow::bail!(
84 "Failed to restore global objects.\n\
85 Exit Code: {}\n\
86 Stderr:\n{}\n\
87 Stdout:\n{}",
88 output.status,
89 stderr,
90 stdout
91 );
92 }
93 }
94}
95
96pub async fn restore_schema(target_url: &str, input_path: &str) -> Result<()> {
98 tracing::info!("Restoring schema from {}", input_path);
99
100 let parts = crate::utils::parse_postgres_url(target_url)
102 .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
103 let pgpass = crate::utils::PgPassFile::new(&parts)
104 .context("Failed to create .pgpass file for authentication")?;
105
106 let env_vars = parts.to_pg_env_vars();
107 let input_path_owned = input_path.to_string();
108
109 crate::utils::retry_subprocess_with_backoff(
111 || {
112 let mut cmd = Command::new("psql");
113 cmd.arg("--host")
114 .arg(&parts.host)
115 .arg("--port")
116 .arg(parts.port.to_string())
117 .arg("--dbname")
118 .arg(&parts.database)
119 .arg(format!("--file={}", input_path_owned))
120 .arg("--quiet")
121 .arg("-v")
122 .arg("ON_ERROR_STOP=1") .env("PGPASSFILE", pgpass.path())
124 .stdout(Stdio::inherit())
125 .stderr(Stdio::inherit());
126
127 if let Some(user) = &parts.user {
129 cmd.arg("--username").arg(user);
130 }
131
132 for (env_var, value) in &env_vars {
134 cmd.env(env_var, value);
135 }
136
137 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
139 cmd.env(env_var, value);
140 }
141
142 cmd.env("PGCONNECT_TIMEOUT", "30");
144
145 cmd.status().context(
146 "Failed to execute psql. Is PostgreSQL client installed?\n\
147 Install with:\n\
148 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
149 - macOS: brew install postgresql\n\
150 - RHEL/CentOS: sudo yum install postgresql",
151 )
152 },
153 3, Duration::from_secs(1), "psql (restore schema)",
156 )
157 .await
158 .context(
159 "Schema restoration failed.\n\
160 \n\
161 Common causes:\n\
162 - Target database does not exist\n\
163 - User lacks CREATE privileges on target\n\
164 - Schema objects already exist (try dropping them first)\n\
165 - Version incompatibility between source and target\n\
166 - Syntax errors in dump file\n\
167 - Connection timeout or network issues",
168 )?;
169
170 tracing::info!("✓ Schema restored successfully");
171 Ok(())
172}
173
174pub async fn restore_data(target_url: &str, input_path: &str) -> Result<()> {
183 let num_cpus = std::thread::available_parallelism()
185 .map(|n| n.get().min(8))
186 .unwrap_or(4);
187
188 tracing::info!(
189 "Restoring data from {} (parallel={}, format=directory)",
190 input_path,
191 num_cpus
192 );
193
194 let parts = crate::utils::parse_postgres_url(target_url)
196 .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
197 let pgpass = crate::utils::PgPassFile::new(&parts)
198 .context("Failed to create .pgpass file for authentication")?;
199
200 let env_vars = parts.to_pg_env_vars();
201 let input_path_owned = input_path.to_string();
202
203 crate::utils::retry_subprocess_with_backoff(
205 || {
206 let mut cmd = Command::new("pg_restore");
207 cmd.arg("--data-only")
208 .arg("--no-owner")
209 .arg(format!("--jobs={}", num_cpus)) .arg("--host")
211 .arg(&parts.host)
212 .arg("--port")
213 .arg(parts.port.to_string())
214 .arg("--dbname")
215 .arg(&parts.database)
216 .arg("--format=directory") .arg("--verbose") .arg(&input_path_owned)
219 .env("PGPASSFILE", pgpass.path())
220 .stdout(Stdio::inherit())
221 .stderr(Stdio::inherit());
222
223 if let Some(user) = &parts.user {
225 cmd.arg("--username").arg(user);
226 }
227
228 for (env_var, value) in &env_vars {
230 cmd.env(env_var, value);
231 }
232
233 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
235 cmd.env(env_var, value);
236 }
237
238 cmd.env("PGCONNECT_TIMEOUT", "30");
240
241 cmd.status().context(
242 "Failed to execute pg_restore. Is PostgreSQL client installed?\n\
243 Install with:\n\
244 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
245 - macOS: brew install postgresql\n\
246 - RHEL/CentOS: sudo yum install postgresql",
247 )
248 },
249 3, Duration::from_secs(1), "pg_restore (restore data)",
252 )
253 .await
254 .context(
255 "Data restoration failed.\n\
256 \n\
257 Common causes:\n\
258 - Foreign key constraint violations\n\
259 - Unique constraint violations (data already exists)\n\
260 - User lacks INSERT privileges on target tables\n\
261 - Disk space issues on target\n\
262 - Data type mismatches\n\
263 - Input directory is not a valid pg_dump directory format\n\
264 - Connection timeout or network issues",
265 )?;
266
267 tracing::info!(
268 "✓ Data restored successfully using {} parallel jobs",
269 num_cpus
270 );
271 Ok(())
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277 use crate::migration::dump;
278 use tempfile::tempdir;
279
280 #[tokio::test]
281 #[ignore]
282 async fn test_restore_globals() {
283 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
284 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
285
286 let dir = tempdir().unwrap();
287 let dump_file = dir.path().join("globals.sql");
288
289 dump::dump_globals(&source_url, dump_file.to_str().unwrap())
291 .await
292 .unwrap();
293
294 let result = restore_globals(&target_url, dump_file.to_str().unwrap()).await;
296 assert!(result.is_ok());
297 }
298}