database_replicator/migration/
restore.rs1use anyhow::{Context, Result};
5use std::process::{Command, Stdio};
6use std::time::Duration;
7
8pub async fn restore_globals(target_url: &str, input_path: &str) -> Result<()> {
10 tracing::info!("Restoring global objects from {}", input_path);
11
12 let parts = crate::utils::parse_postgres_url(target_url)
14 .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
15 let pgpass = crate::utils::PgPassFile::new(&parts)
16 .context("Failed to create .pgpass file for authentication")?;
17
18 let env_vars = parts.to_pg_env_vars();
19 let input_path_owned = input_path.to_string();
20
21 let result = crate::utils::retry_subprocess_with_backoff(
23 || {
24 let mut cmd = Command::new("psql");
25 cmd.arg("--host")
26 .arg(&parts.host)
27 .arg("--port")
28 .arg(parts.port.to_string())
29 .arg("--dbname")
30 .arg(&parts.database)
31 .arg(format!("--file={}", input_path_owned))
32 .arg("--quiet")
33 .arg("-v")
34 .arg("ON_ERROR_STOP=1") .env("PGPASSFILE", pgpass.path())
36 .stdout(Stdio::inherit())
37 .stderr(Stdio::inherit());
38
39 if let Some(user) = &parts.user {
41 cmd.arg("--username").arg(user);
42 }
43
44 for (env_var, value) in &env_vars {
46 cmd.env(env_var, value);
47 }
48
49 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
51 cmd.env(env_var, value);
52 }
53
54 cmd.status().context(
55 "Failed to execute psql. Is PostgreSQL client installed?\n\
56 Install with:\n\
57 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
58 - macOS: brew install postgresql\n\
59 - RHEL/CentOS: sudo yum install postgresql",
60 )
61 },
62 3, Duration::from_secs(1), "psql (restore globals)",
65 );
66
67 match result {
69 Ok(()) => {
70 tracing::info!("✓ Global objects restored");
71 Ok(())
72 }
73 Err(e) => {
74 tracing::warn!("⚠ Some global object restoration warnings occurred: {}", e);
75 Ok(())
77 }
78 }
79}
80
81pub async fn restore_schema(target_url: &str, input_path: &str) -> Result<()> {
83 tracing::info!("Restoring schema from {}", input_path);
84
85 let parts = crate::utils::parse_postgres_url(target_url)
87 .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
88 let pgpass = crate::utils::PgPassFile::new(&parts)
89 .context("Failed to create .pgpass file for authentication")?;
90
91 let env_vars = parts.to_pg_env_vars();
92 let input_path_owned = input_path.to_string();
93
94 crate::utils::retry_subprocess_with_backoff(
96 || {
97 let mut cmd = Command::new("psql");
98 cmd.arg("--host")
99 .arg(&parts.host)
100 .arg("--port")
101 .arg(parts.port.to_string())
102 .arg("--dbname")
103 .arg(&parts.database)
104 .arg(format!("--file={}", input_path_owned))
105 .arg("--quiet")
106 .arg("-v")
107 .arg("ON_ERROR_STOP=1") .env("PGPASSFILE", pgpass.path())
109 .stdout(Stdio::inherit())
110 .stderr(Stdio::inherit());
111
112 if let Some(user) = &parts.user {
114 cmd.arg("--username").arg(user);
115 }
116
117 for (env_var, value) in &env_vars {
119 cmd.env(env_var, value);
120 }
121
122 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
124 cmd.env(env_var, value);
125 }
126
127 cmd.status().context(
128 "Failed to execute psql. Is PostgreSQL client installed?\n\
129 Install with:\n\
130 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
131 - macOS: brew install postgresql\n\
132 - RHEL/CentOS: sudo yum install postgresql",
133 )
134 },
135 3, Duration::from_secs(1), "psql (restore schema)",
138 )
139 .context(
140 "Schema restoration failed.\n\
141 \n\
142 Common causes:\n\
143 - Target database does not exist\n\
144 - User lacks CREATE privileges on target\n\
145 - Schema objects already exist (try dropping them first)\n\
146 - Version incompatibility between source and target\n\
147 - Syntax errors in dump file\n\
148 - Connection timeout or network issues",
149 )?;
150
151 tracing::info!("✓ Schema restored successfully");
152 Ok(())
153}
154
155pub async fn restore_data(target_url: &str, input_path: &str) -> Result<()> {
164 let num_cpus = std::thread::available_parallelism()
166 .map(|n| n.get().min(8))
167 .unwrap_or(4);
168
169 tracing::info!(
170 "Restoring data from {} (parallel={}, format=directory)",
171 input_path,
172 num_cpus
173 );
174
175 let parts = crate::utils::parse_postgres_url(target_url)
177 .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
178 let pgpass = crate::utils::PgPassFile::new(&parts)
179 .context("Failed to create .pgpass file for authentication")?;
180
181 let env_vars = parts.to_pg_env_vars();
182 let input_path_owned = input_path.to_string();
183
184 crate::utils::retry_subprocess_with_backoff(
186 || {
187 let mut cmd = Command::new("pg_restore");
188 cmd.arg("--data-only")
189 .arg("--no-owner")
190 .arg(format!("--jobs={}", num_cpus)) .arg("--host")
192 .arg(&parts.host)
193 .arg("--port")
194 .arg(parts.port.to_string())
195 .arg("--dbname")
196 .arg(&parts.database)
197 .arg("--format=directory") .arg("--verbose") .arg(&input_path_owned)
200 .env("PGPASSFILE", pgpass.path())
201 .stdout(Stdio::inherit())
202 .stderr(Stdio::inherit());
203
204 if let Some(user) = &parts.user {
206 cmd.arg("--username").arg(user);
207 }
208
209 for (env_var, value) in &env_vars {
211 cmd.env(env_var, value);
212 }
213
214 for (env_var, value) in crate::utils::get_keepalive_env_vars() {
216 cmd.env(env_var, value);
217 }
218
219 cmd.status().context(
220 "Failed to execute pg_restore. Is PostgreSQL client installed?\n\
221 Install with:\n\
222 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
223 - macOS: brew install postgresql\n\
224 - RHEL/CentOS: sudo yum install postgresql",
225 )
226 },
227 3, Duration::from_secs(1), "pg_restore (restore data)",
230 )
231 .context(
232 "Data restoration failed.\n\
233 \n\
234 Common causes:\n\
235 - Foreign key constraint violations\n\
236 - Unique constraint violations (data already exists)\n\
237 - User lacks INSERT privileges on target tables\n\
238 - Disk space issues on target\n\
239 - Data type mismatches\n\
240 - Input directory is not a valid pg_dump directory format\n\
241 - Connection timeout or network issues",
242 )?;
243
244 tracing::info!(
245 "✓ Data restored successfully using {} parallel jobs",
246 num_cpus
247 );
248 Ok(())
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254 use crate::migration::dump;
255 use tempfile::tempdir;
256
257 #[tokio::test]
258 #[ignore]
259 async fn test_restore_globals() {
260 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
261 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
262
263 let dir = tempdir().unwrap();
264 let dump_file = dir.path().join("globals.sql");
265
266 dump::dump_globals(&source_url, dump_file.to_str().unwrap())
268 .await
269 .unwrap();
270
271 let result = restore_globals(&target_url, dump_file.to_str().unwrap()).await;
273 assert!(result.is_ok());
274 }
275}