database_replicator/migration/
restore.rs

1// ABOUTME: Wrapper for psql and pg_restore to import database objects
2// ABOUTME: Restores global objects, schema, and data to target
3
4use anyhow::{Context, Result};
5use std::process::{Command, Stdio};
6use std::time::Duration;
7use tokio::process::Command as TokioCommand;
8
9/// Restore global objects using psql
10pub async fn restore_globals(target_url: &str, input_path: &str) -> Result<()> {
11    tracing::info!("Restoring global objects from {}", input_path);
12
13    // Parse URL and create .pgpass file for secure authentication
14    let parts = crate::utils::parse_postgres_url(target_url)
15        .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
16    let pgpass = crate::utils::PgPassFile::new(&parts)
17        .context("Failed to create .pgpass file for authentication")?;
18
19    let env_vars = parts.to_pg_env_vars();
20
21    let mut cmd = TokioCommand::new("psql");
22    cmd.arg("--host")
23        .arg(&parts.host)
24        .arg("--port")
25        .arg(parts.port.to_string())
26        .arg("--dbname")
27        .arg(&parts.database)
28        .arg(format!("--file={}", input_path))
29        .arg("--quiet")
30        .arg("-v")
31        .arg("ON_ERROR_STOP=1") // Stop on first error for better visibility
32        .env("PGPASSFILE", pgpass.path())
33        .stdout(Stdio::piped())
34        .stderr(Stdio::piped());
35
36    // Add username if specified
37    if let Some(user) = &parts.user {
38        cmd.arg("--username").arg(user);
39    }
40
41    // Apply query parameters as environment variables (SSL, channel_binding, etc.)
42    for (env_var, value) in &env_vars {
43        cmd.env(env_var, value);
44    }
45
46    // Apply TCP keepalive parameters to prevent idle connection timeouts
47    for (env_var, value) in crate::utils::get_keepalive_env_vars() {
48        cmd.env(env_var, value);
49    }
50
51    // Mitigate hangs on serverless DBs with strict connection limits
52    cmd.env("PGCONNECT_TIMEOUT", "30");
53
54    let output = cmd.output().await.context(
55        "Failed to execute psql. Is PostgreSQL client installed?\n\
56         Install with:\n\
57         - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
58         - macOS: brew install postgresql\n\
59         - RHEL/CentOS: sudo yum install postgresql",
60    )?;
61
62    if output.status.success() {
63        tracing::info!("✓ Global objects restored");
64        // Even on success, there might be NOTICES or WARNINGS on stderr
65        let stderr = String::from_utf8_lossy(&output.stderr);
66        if !stderr.trim().is_empty() {
67            tracing::debug!("psql stderr output:\n{}", stderr);
68        }
69        Ok(())
70    } else {
71        let stderr = String::from_utf8_lossy(&output.stderr);
72        let stdout = String::from_utf8_lossy(&output.stdout);
73
74        // Check for common, non-fatal warnings
75        if stderr.contains("already exists") && stderr.contains("skipping") {
76            tracing::warn!(
77                "⚠ Some global objects already exist, which is usually safe. Full output:\n{}",
78                stderr
79            );
80            Ok(()) // Treat as success
81        } else {
82            // A real error occurred
83            anyhow::bail!(
84                "Failed to restore global objects.\n\
85                 Exit Code: {}\n\
86                 Stderr:\n{}\n\
87                 Stdout:\n{}",
88                output.status,
89                stderr,
90                stdout
91            );
92        }
93    }
94}
95
96/// Restore schema using psql
97pub async fn restore_schema(target_url: &str, input_path: &str) -> Result<()> {
98    tracing::info!("Restoring schema from {}", input_path);
99
100    // Parse URL and create .pgpass file for secure authentication
101    let parts = crate::utils::parse_postgres_url(target_url)
102        .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
103    let pgpass = crate::utils::PgPassFile::new(&parts)
104        .context("Failed to create .pgpass file for authentication")?;
105
106    let env_vars = parts.to_pg_env_vars();
107    let input_path_owned = input_path.to_string();
108
109    // Wrap subprocess execution with retry logic
110    crate::utils::retry_subprocess_with_backoff(
111        || {
112            let mut cmd = Command::new("psql");
113            cmd.arg("--host")
114                .arg(&parts.host)
115                .arg("--port")
116                .arg(parts.port.to_string())
117                .arg("--dbname")
118                .arg(&parts.database)
119                .arg(format!("--file={}", input_path_owned))
120                .arg("--quiet")
121                .arg("-v")
122                .arg("ON_ERROR_STOP=1") // Stop on first error for better visibility
123                .env("PGPASSFILE", pgpass.path())
124                .stdout(Stdio::inherit())
125                .stderr(Stdio::inherit());
126
127            // Add username if specified
128            if let Some(user) = &parts.user {
129                cmd.arg("--username").arg(user);
130            }
131
132            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
133            for (env_var, value) in &env_vars {
134                cmd.env(env_var, value);
135            }
136
137            // Apply TCP keepalive parameters to prevent idle connection timeouts
138            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
139                cmd.env(env_var, value);
140            }
141
142            // Mitigate hangs on serverless DBs with strict connection limits
143            cmd.env("PGCONNECT_TIMEOUT", "30");
144
145            cmd.status().context(
146                "Failed to execute psql. Is PostgreSQL client installed?\n\
147                 Install with:\n\
148                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
149                 - macOS: brew install postgresql\n\
150                 - RHEL/CentOS: sudo yum install postgresql",
151            )
152        },
153        3,                      // Max 3 retries
154        Duration::from_secs(1), // Start with 1 second delay
155        "psql (restore schema)",
156    )
157    .await
158    .context(
159        "Schema restoration failed.\n\
160         \n\
161         Common causes:\n\
162         - Target database does not exist\n\
163         - User lacks CREATE privileges on target\n\
164         - Schema objects already exist (try dropping them first)\n\
165         - Version incompatibility between source and target\n\
166         - Syntax errors in dump file\n\
167         - Connection timeout or network issues",
168    )?;
169
170    tracing::info!("✓ Schema restored successfully");
171    Ok(())
172}
173
174/// Restore data using pg_restore with parallel jobs
175///
176/// Uses PostgreSQL directory format restore with:
177/// - Parallel restore for faster performance
178/// - Automatic decompression of compressed dump files
179/// - Optimized for directory format dumps created by dump_data()
180///
181/// The number of parallel jobs is automatically determined based on available CPU cores.
182///
183/// # Note on Retry Behavior
184///
185/// Unlike schema restoration, data restoration does NOT use retry logic. This is
186/// intentional because pg_restore with --data-only is NOT idempotent - if it partially
187/// succeeds and then fails, retrying would cause duplicate key constraint violations.
188///
189/// If data restoration fails due to connection issues, the user should re-run the
190/// command with --drop-existing to ensure a clean slate.
191pub async fn restore_data(target_url: &str, input_path: &str) -> Result<()> {
192    // Determine optimal number of parallel jobs (number of CPUs, capped at 8)
193    let num_cpus = std::thread::available_parallelism()
194        .map(|n| n.get().min(8))
195        .unwrap_or(4);
196
197    tracing::info!(
198        "Restoring data from {} (parallel={}, format=directory)",
199        input_path,
200        num_cpus
201    );
202
203    // Parse URL and create .pgpass file for secure authentication
204    let parts = crate::utils::parse_postgres_url(target_url)
205        .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
206    let pgpass = crate::utils::PgPassFile::new(&parts)
207        .context("Failed to create .pgpass file for authentication")?;
208
209    let env_vars = parts.to_pg_env_vars();
210
211    // NOTE: We intentionally do NOT use retry_subprocess_with_backoff here.
212    // pg_restore with --data-only is NOT idempotent - if it partially succeeds
213    // (inserts some rows) and then fails, retrying would cause duplicate key
214    // constraint violations because the already-inserted rows would be re-inserted.
215    //
216    // If data restoration fails, the user should re-run with --drop-existing to
217    // ensure a clean database before retry.
218    let mut cmd = Command::new("pg_restore");
219    cmd.arg("--data-only")
220        .arg("--no-owner")
221        .arg(format!("--jobs={}", num_cpus)) // Parallel restore jobs
222        .arg("--host")
223        .arg(&parts.host)
224        .arg("--port")
225        .arg(parts.port.to_string())
226        .arg("--dbname")
227        .arg(&parts.database)
228        .arg("--format=directory") // Directory format
229        .arg("--verbose") // Show progress
230        .arg(input_path)
231        .env("PGPASSFILE", pgpass.path())
232        .stdout(Stdio::inherit())
233        .stderr(Stdio::inherit());
234
235    // Add username if specified
236    if let Some(user) = &parts.user {
237        cmd.arg("--username").arg(user);
238    }
239
240    // Apply query parameters as environment variables (SSL, channel_binding, etc.)
241    for (env_var, value) in &env_vars {
242        cmd.env(env_var, value);
243    }
244
245    // Apply TCP keepalive parameters to prevent idle connection timeouts
246    for (env_var, value) in crate::utils::get_keepalive_env_vars() {
247        cmd.env(env_var, value);
248    }
249
250    // Mitigate hangs on serverless DBs with strict connection limits
251    cmd.env("PGCONNECT_TIMEOUT", "30");
252
253    let status = cmd.status().context(
254        "Failed to execute pg_restore. Is PostgreSQL client installed?\n\
255         Install with:\n\
256         - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
257         - macOS: brew install postgresql\n\
258         - RHEL/CentOS: sudo yum install postgresql",
259    )?;
260
261    if !status.success() {
262        anyhow::bail!(
263            "Data restoration failed (exit code: {}).\n\
264             \n\
265             Common causes:\n\
266             - Foreign key constraint violations\n\
267             - Unique constraint violations (data already exists from a previous partial restore)\n\
268             - User lacks INSERT privileges on target tables\n\
269             - Disk space issues on target\n\
270             - Data type mismatches\n\
271             - Input directory is not a valid pg_dump directory format\n\
272             - Connection timeout or network issues\n\
273             \n\
274             If you see 'duplicate key' errors, re-run with --drop-existing to ensure a clean database.",
275            status.code().unwrap_or(-1)
276        );
277    }
278
279    tracing::info!(
280        "✓ Data restored successfully using {} parallel jobs",
281        num_cpus
282    );
283    Ok(())
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289    use crate::migration::dump;
290    use tempfile::tempdir;
291
292    #[tokio::test]
293    #[ignore]
294    async fn test_restore_globals() {
295        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
296        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
297
298        let dir = tempdir().unwrap();
299        let dump_file = dir.path().join("globals.sql");
300
301        // Dump from source
302        dump::dump_globals(&source_url, dump_file.to_str().unwrap())
303            .await
304            .unwrap();
305
306        // Restore to target
307        let result = restore_globals(&target_url, dump_file.to_str().unwrap()).await;
308        assert!(result.is_ok());
309    }
310}