database_replicator/migration/
restore.rs1use anyhow::{Context, Result};
5use std::process::{Command, Stdio};
6use std::time::Duration;
7
8pub async fn restore_globals(target_url: &str, input_path: &str) -> Result<()> {
10 tracing::info!("Restoring global objects from {}", input_path);
11
12 let parts = crate::utils::parse_postgres_url(target_url)
14 .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
15 let pgpass = crate::utils::PgPassFile::new(&parts)
16 .context("Failed to create .pgpass file for authentication")?;
17
18 let env_vars = parts.to_pg_env_vars();
19 let input_path_owned = input_path.to_string();
20
21 let result = crate::utils::retry_subprocess_with_backoff(
23 || {
24 let mut cmd = Command::new("psql");
25 cmd.arg("--host")
26 .arg(&parts.host)
27 .arg("--port")
28 .arg(parts.port.to_string())
29 .arg("--dbname")
30 .arg(&parts.database)
31 .arg(format!("--file={}", input_path_owned))
32 .arg("--quiet")
33 .arg("-v")
34 .arg("ON_ERROR_STOP=1") .env("PGPASSFILE", pgpass.path())
36 .stdout(Stdio::inherit())
37 .stderr(Stdio::inherit());
38
39 if let Some(user) = &parts.user {
41 cmd.arg("--username").arg(user);
42 }
43
44 for (env_var, value) in &env_vars {
46 cmd.env(env_var, value);
47 }
48
49 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
51 cmd.env(env_var, value);
52 }
53
54 cmd.status().context(
55 "Failed to execute psql. Is PostgreSQL client installed?\n\
56 Install with:\n\
57 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
58 - macOS: brew install postgresql\n\
59 - RHEL/CentOS: sudo yum install postgresql",
60 )
61 },
62 3, Duration::from_secs(1), "psql (restore globals)",
65 )
66 .await;
67
68 match result {
70 Ok(()) => {
71 tracing::info!("✓ Global objects restored");
72 Ok(())
73 }
74 Err(e) => {
75 tracing::warn!("⚠ Some global object restoration warnings occurred: {}", e);
76 Ok(())
78 }
79 }
80}
81
82pub async fn restore_schema(target_url: &str, input_path: &str) -> Result<()> {
84 tracing::info!("Restoring schema from {}", input_path);
85
86 let parts = crate::utils::parse_postgres_url(target_url)
88 .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
89 let pgpass = crate::utils::PgPassFile::new(&parts)
90 .context("Failed to create .pgpass file for authentication")?;
91
92 let env_vars = parts.to_pg_env_vars();
93 let input_path_owned = input_path.to_string();
94
95 crate::utils::retry_subprocess_with_backoff(
97 || {
98 let mut cmd = Command::new("psql");
99 cmd.arg("--host")
100 .arg(&parts.host)
101 .arg("--port")
102 .arg(parts.port.to_string())
103 .arg("--dbname")
104 .arg(&parts.database)
105 .arg(format!("--file={}", input_path_owned))
106 .arg("--quiet")
107 .arg("-v")
108 .arg("ON_ERROR_STOP=1") .env("PGPASSFILE", pgpass.path())
110 .stdout(Stdio::inherit())
111 .stderr(Stdio::inherit());
112
113 if let Some(user) = &parts.user {
115 cmd.arg("--username").arg(user);
116 }
117
118 for (env_var, value) in &env_vars {
120 cmd.env(env_var, value);
121 }
122
123 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
125 cmd.env(env_var, value);
126 }
127
128 cmd.status().context(
129 "Failed to execute psql. Is PostgreSQL client installed?\n\
130 Install with:\n\
131 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
132 - macOS: brew install postgresql\n\
133 - RHEL/CentOS: sudo yum install postgresql",
134 )
135 },
136 3, Duration::from_secs(1), "psql (restore schema)",
139 )
140 .await
141 .context(
142 "Schema restoration failed.\n\
143 \n\
144 Common causes:\n\
145 - Target database does not exist\n\
146 - User lacks CREATE privileges on target\n\
147 - Schema objects already exist (try dropping them first)\n\
148 - Version incompatibility between source and target\n\
149 - Syntax errors in dump file\n\
150 - Connection timeout or network issues",
151 )?;
152
153 tracing::info!("✓ Schema restored successfully");
154 Ok(())
155}
156
157pub async fn restore_data(target_url: &str, input_path: &str) -> Result<()> {
166 let num_cpus = std::thread::available_parallelism()
168 .map(|n| n.get().min(8))
169 .unwrap_or(4);
170
171 tracing::info!(
172 "Restoring data from {} (parallel={}, format=directory)",
173 input_path,
174 num_cpus
175 );
176
177 let parts = crate::utils::parse_postgres_url(target_url)
179 .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
180 let pgpass = crate::utils::PgPassFile::new(&parts)
181 .context("Failed to create .pgpass file for authentication")?;
182
183 let env_vars = parts.to_pg_env_vars();
184 let input_path_owned = input_path.to_string();
185
186 crate::utils::retry_subprocess_with_backoff(
188 || {
189 let mut cmd = Command::new("pg_restore");
190 cmd.arg("--data-only")
191 .arg("--no-owner")
192 .arg(format!("--jobs={}", num_cpus)) .arg("--host")
194 .arg(&parts.host)
195 .arg("--port")
196 .arg(parts.port.to_string())
197 .arg("--dbname")
198 .arg(&parts.database)
199 .arg("--format=directory") .arg("--verbose") .arg(&input_path_owned)
202 .env("PGPASSFILE", pgpass.path())
203 .stdout(Stdio::inherit())
204 .stderr(Stdio::inherit());
205
206 if let Some(user) = &parts.user {
208 cmd.arg("--username").arg(user);
209 }
210
211 for (env_var, value) in &env_vars {
213 cmd.env(env_var, value);
214 }
215
216 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
218 cmd.env(env_var, value);
219 }
220
221 cmd.status().context(
222 "Failed to execute pg_restore. Is PostgreSQL client installed?\n\
223 Install with:\n\
224 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
225 - macOS: brew install postgresql\n\
226 - RHEL/CentOS: sudo yum install postgresql",
227 )
228 },
229 3, Duration::from_secs(1), "pg_restore (restore data)",
232 )
233 .await
234 .context(
235 "Data restoration failed.\n\
236 \n\
237 Common causes:\n\
238 - Foreign key constraint violations\n\
239 - Unique constraint violations (data already exists)\n\
240 - User lacks INSERT privileges on target tables\n\
241 - Disk space issues on target\n\
242 - Data type mismatches\n\
243 - Input directory is not a valid pg_dump directory format\n\
244 - Connection timeout or network issues",
245 )?;
246
247 tracing::info!(
248 "✓ Data restored successfully using {} parallel jobs",
249 num_cpus
250 );
251 Ok(())
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257 use crate::migration::dump;
258 use tempfile::tempdir;
259
260 #[tokio::test]
261 #[ignore]
262 async fn test_restore_globals() {
263 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
264 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
265
266 let dir = tempdir().unwrap();
267 let dump_file = dir.path().join("globals.sql");
268
269 dump::dump_globals(&source_url, dump_file.to_str().unwrap())
271 .await
272 .unwrap();
273
274 let result = restore_globals(&target_url, dump_file.to_str().unwrap()).await;
276 assert!(result.is_ok());
277 }
278}