database_replicator/migration/
restore.rs

1// ABOUTME: Wrapper for psql and pg_restore to import database objects
2// ABOUTME: Restores global objects, schema, and data to target
3
4use anyhow::{Context, Result};
5use std::process::{Command, Stdio};
6use std::time::Duration;
7use tokio::process::Command as TokioCommand;
8
9/// Restore global objects using psql
10pub async fn restore_globals(target_url: &str, input_path: &str) -> Result<()> {
11    tracing::info!("Restoring global objects from {}", input_path);
12
13    // Parse URL and create .pgpass file for secure authentication
14    let parts = crate::utils::parse_postgres_url(target_url)
15        .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
16    let pgpass = crate::utils::PgPassFile::new(&parts)
17        .context("Failed to create .pgpass file for authentication")?;
18
19    let env_vars = parts.to_pg_env_vars();
20
21    let mut cmd = TokioCommand::new("psql");
22    cmd.arg("--host")
23        .arg(&parts.host)
24        .arg("--port")
25        .arg(parts.port.to_string())
26        .arg("--dbname")
27        .arg(&parts.database)
28        .arg(format!("--file={}", input_path))
29        .arg("--quiet")
30        .arg("-v")
31        .arg("ON_ERROR_STOP=1") // Stop on first error for better visibility
32        .env("PGPASSFILE", pgpass.path())
33        .stdout(Stdio::piped())
34        .stderr(Stdio::piped());
35
36    // Add username if specified
37    if let Some(user) = &parts.user {
38        cmd.arg("--username").arg(user);
39    }
40
41    // Apply query parameters as environment variables (SSL, channel_binding, etc.)
42    for (env_var, value) in &env_vars {
43        cmd.env(env_var, value);
44    }
45
46    // Apply TCP keepalive parameters to prevent idle connection timeouts
47    for (env_var, value) in crate::utils::get_keepalive_env_vars() {
48        cmd.env(env_var, value);
49    }
50
51    // Mitigate hangs on serverless DBs with strict connection limits
52    cmd.env("PGCONNECT_TIMEOUT", "30");
53
54    let output = cmd.output().await.context(
55        "Failed to execute psql. Is PostgreSQL client installed?\n\
56         Install with:\n\
57         - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
58         - macOS: brew install postgresql\n\
59         - RHEL/CentOS: sudo yum install postgresql",
60    )?;
61
62    if output.status.success() {
63        tracing::info!("✓ Global objects restored");
64        // Even on success, there might be NOTICES or WARNINGS on stderr
65        let stderr = String::from_utf8_lossy(&output.stderr);
66        if !stderr.trim().is_empty() {
67            tracing::debug!("psql stderr output:\n{}", stderr);
68        }
69        Ok(())
70    } else {
71        let stderr = String::from_utf8_lossy(&output.stderr);
72        let stdout = String::from_utf8_lossy(&output.stdout);
73
74        // Check for common, non-fatal warnings
75        if stderr.contains("already exists") && stderr.contains("skipping") {
76            tracing::warn!(
77                "⚠ Some global objects already exist, which is usually safe. Full output:\n{}",
78                stderr
79            );
80            Ok(()) // Treat as success
81        } else {
82            // A real error occurred
83            anyhow::bail!(
84                "Failed to restore global objects.\n\
85                 Exit Code: {}\n\
86                 Stderr:\n{}\n\
87                 Stdout:\n{}",
88                output.status,
89                stderr,
90                stdout
91            );
92        }
93    }
94}
95
96/// Restore schema using psql
97pub async fn restore_schema(target_url: &str, input_path: &str) -> Result<()> {
98    tracing::info!("Restoring schema from {}", input_path);
99
100    // Parse URL and create .pgpass file for secure authentication
101    let parts = crate::utils::parse_postgres_url(target_url)
102        .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
103    let pgpass = crate::utils::PgPassFile::new(&parts)
104        .context("Failed to create .pgpass file for authentication")?;
105
106    let env_vars = parts.to_pg_env_vars();
107    let input_path_owned = input_path.to_string();
108
109    // Wrap subprocess execution with retry logic
110    crate::utils::retry_subprocess_with_backoff(
111        || {
112            let mut cmd = Command::new("psql");
113            cmd.arg("--host")
114                .arg(&parts.host)
115                .arg("--port")
116                .arg(parts.port.to_string())
117                .arg("--dbname")
118                .arg(&parts.database)
119                .arg(format!("--file={}", input_path_owned))
120                .arg("--quiet")
121                .arg("-v")
122                .arg("ON_ERROR_STOP=1") // Stop on first error for better visibility
123                .env("PGPASSFILE", pgpass.path())
124                .stdout(Stdio::inherit())
125                .stderr(Stdio::inherit());
126
127            // Add username if specified
128            if let Some(user) = &parts.user {
129                cmd.arg("--username").arg(user);
130            }
131
132            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
133            for (env_var, value) in &env_vars {
134                cmd.env(env_var, value);
135            }
136
137            // Apply TCP keepalive parameters to prevent idle connection timeouts
138            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
139                cmd.env(env_var, value);
140            }
141
142            // Mitigate hangs on serverless DBs with strict connection limits
143            cmd.env("PGCONNECT_TIMEOUT", "30");
144
145            cmd.status().context(
146                "Failed to execute psql. Is PostgreSQL client installed?\n\
147                 Install with:\n\
148                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
149                 - macOS: brew install postgresql\n\
150                 - RHEL/CentOS: sudo yum install postgresql",
151            )
152        },
153        3,                      // Max 3 retries
154        Duration::from_secs(1), // Start with 1 second delay
155        "psql (restore schema)",
156    )
157    .await
158    .context(
159        "Schema restoration failed.\n\
160         \n\
161         Common causes:\n\
162         - Target database does not exist\n\
163         - User lacks CREATE privileges on target\n\
164         - Schema objects already exist (try dropping them first)\n\
165         - Version incompatibility between source and target\n\
166         - Syntax errors in dump file\n\
167         - Connection timeout or network issues",
168    )?;
169
170    tracing::info!("✓ Schema restored successfully");
171    Ok(())
172}
173
174/// Restore data using pg_restore with parallel jobs
175///
176/// Uses PostgreSQL directory format restore with:
177/// - Parallel restore for faster performance
178/// - Automatic decompression of compressed dump files
179/// - Optimized for directory format dumps created by dump_data()
180///
181/// The number of parallel jobs is automatically determined based on available CPU cores.
182pub async fn restore_data(target_url: &str, input_path: &str) -> Result<()> {
183    // Determine optimal number of parallel jobs (number of CPUs, capped at 8)
184    let num_cpus = std::thread::available_parallelism()
185        .map(|n| n.get().min(8))
186        .unwrap_or(4);
187
188    tracing::info!(
189        "Restoring data from {} (parallel={}, format=directory)",
190        input_path,
191        num_cpus
192    );
193
194    // Parse URL and create .pgpass file for secure authentication
195    let parts = crate::utils::parse_postgres_url(target_url)
196        .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
197    let pgpass = crate::utils::PgPassFile::new(&parts)
198        .context("Failed to create .pgpass file for authentication")?;
199
200    let env_vars = parts.to_pg_env_vars();
201    let input_path_owned = input_path.to_string();
202
203    // Wrap subprocess execution with retry logic
204    crate::utils::retry_subprocess_with_backoff(
205        || {
206            let mut cmd = Command::new("pg_restore");
207            cmd.arg("--data-only")
208                .arg("--no-owner")
209                .arg(format!("--jobs={}", num_cpus)) // Parallel restore jobs
210                .arg("--host")
211                .arg(&parts.host)
212                .arg("--port")
213                .arg(parts.port.to_string())
214                .arg("--dbname")
215                .arg(&parts.database)
216                .arg("--format=directory") // Directory format
217                .arg("--verbose") // Show progress
218                .arg(&input_path_owned)
219                .env("PGPASSFILE", pgpass.path())
220                .stdout(Stdio::inherit())
221                .stderr(Stdio::inherit());
222
223            // Add username if specified
224            if let Some(user) = &parts.user {
225                cmd.arg("--username").arg(user);
226            }
227
228            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
229            for (env_var, value) in &env_vars {
230                cmd.env(env_var, value);
231            }
232
233            // Apply TCP keepalive parameters to prevent idle connection timeouts
234            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
235                cmd.env(env_var, value);
236            }
237
238            // Mitigate hangs on serverless DBs with strict connection limits
239            cmd.env("PGCONNECT_TIMEOUT", "30");
240
241            cmd.status().context(
242                "Failed to execute pg_restore. Is PostgreSQL client installed?\n\
243                 Install with:\n\
244                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
245                 - macOS: brew install postgresql\n\
246                 - RHEL/CentOS: sudo yum install postgresql",
247            )
248        },
249        3,                      // Max 3 retries
250        Duration::from_secs(1), // Start with 1 second delay
251        "pg_restore (restore data)",
252    )
253    .await
254    .context(
255        "Data restoration failed.\n\
256         \n\
257         Common causes:\n\
258         - Foreign key constraint violations\n\
259         - Unique constraint violations (data already exists)\n\
260         - User lacks INSERT privileges on target tables\n\
261         - Disk space issues on target\n\
262         - Data type mismatches\n\
263         - Input directory is not a valid pg_dump directory format\n\
264         - Connection timeout or network issues",
265    )?;
266
267    tracing::info!(
268        "✓ Data restored successfully using {} parallel jobs",
269        num_cpus
270    );
271    Ok(())
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277    use crate::migration::dump;
278    use tempfile::tempdir;
279
280    #[tokio::test]
281    #[ignore]
282    async fn test_restore_globals() {
283        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
284        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
285
286        let dir = tempdir().unwrap();
287        let dump_file = dir.path().join("globals.sql");
288
289        // Dump from source
290        dump::dump_globals(&source_url, dump_file.to_str().unwrap())
291            .await
292            .unwrap();
293
294        // Restore to target
295        let result = restore_globals(&target_url, dump_file.to_str().unwrap()).await;
296        assert!(result.is_ok());
297    }
298}