database_replicator/migration/
restore.rs

1// ABOUTME: Wrapper for psql and pg_restore to import database objects
2// ABOUTME: Restores global objects, schema, and data to target
3
4use anyhow::{Context, Result};
5use std::process::{Command, Stdio};
6use std::time::Duration;
7
8/// Restore global objects using psql
9pub async fn restore_globals(target_url: &str, input_path: &str) -> Result<()> {
10    tracing::info!("Restoring global objects from {}", input_path);
11
12    // Parse URL and create .pgpass file for secure authentication
13    let parts = crate::utils::parse_postgres_url(target_url)
14        .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
15    let pgpass = crate::utils::PgPassFile::new(&parts)
16        .context("Failed to create .pgpass file for authentication")?;
17
18    let env_vars = parts.to_pg_env_vars();
19    let input_path_owned = input_path.to_string();
20
21    // Wrap subprocess execution with retry logic
22    let result = crate::utils::retry_subprocess_with_backoff(
23        || {
24            let mut cmd = Command::new("psql");
25            cmd.arg("--host")
26                .arg(&parts.host)
27                .arg("--port")
28                .arg(parts.port.to_string())
29                .arg("--dbname")
30                .arg(&parts.database)
31                .arg(format!("--file={}", input_path_owned))
32                .arg("--quiet")
33                .arg("-v")
34                .arg("ON_ERROR_STOP=1") // Stop on first error for better visibility
35                .env("PGPASSFILE", pgpass.path())
36                .stdout(Stdio::inherit())
37                .stderr(Stdio::inherit());
38
39            // Add username if specified
40            if let Some(user) = &parts.user {
41                cmd.arg("--username").arg(user);
42            }
43
44            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
45            for (env_var, value) in &env_vars {
46                cmd.env(env_var, value);
47            }
48
49            // Apply TCP keepalive parameters to prevent idle connection timeouts
50            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
51                cmd.env(env_var, value);
52            }
53
54            cmd.status().context(
55                "Failed to execute psql. Is PostgreSQL client installed?\n\
56                 Install with:\n\
57                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
58                 - macOS: brew install postgresql\n\
59                 - RHEL/CentOS: sudo yum install postgresql",
60            )
61        },
62        3,                      // Max 3 retries
63        Duration::from_secs(1), // Start with 1 second delay
64        "psql (restore globals)",
65    )
66    .await;
67
68    // Handle result - don't fail on warnings for global objects
69    match result {
70        Ok(()) => {
71            tracing::info!("✓ Global objects restored");
72            Ok(())
73        }
74        Err(e) => {
75            tracing::warn!("⚠ Some global object restoration warnings occurred: {}", e);
76            // Don't fail - some errors are expected (roles may already exist)
77            Ok(())
78        }
79    }
80}
81
82/// Restore schema using psql
83pub async fn restore_schema(target_url: &str, input_path: &str) -> Result<()> {
84    tracing::info!("Restoring schema from {}", input_path);
85
86    // Parse URL and create .pgpass file for secure authentication
87    let parts = crate::utils::parse_postgres_url(target_url)
88        .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
89    let pgpass = crate::utils::PgPassFile::new(&parts)
90        .context("Failed to create .pgpass file for authentication")?;
91
92    let env_vars = parts.to_pg_env_vars();
93    let input_path_owned = input_path.to_string();
94
95    // Wrap subprocess execution with retry logic
96    crate::utils::retry_subprocess_with_backoff(
97        || {
98            let mut cmd = Command::new("psql");
99            cmd.arg("--host")
100                .arg(&parts.host)
101                .arg("--port")
102                .arg(parts.port.to_string())
103                .arg("--dbname")
104                .arg(&parts.database)
105                .arg(format!("--file={}", input_path_owned))
106                .arg("--quiet")
107                .arg("-v")
108                .arg("ON_ERROR_STOP=1") // Stop on first error for better visibility
109                .env("PGPASSFILE", pgpass.path())
110                .stdout(Stdio::inherit())
111                .stderr(Stdio::inherit());
112
113            // Add username if specified
114            if let Some(user) = &parts.user {
115                cmd.arg("--username").arg(user);
116            }
117
118            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
119            for (env_var, value) in &env_vars {
120                cmd.env(env_var, value);
121            }
122
123            // Apply TCP keepalive parameters to prevent idle connection timeouts
124            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
125                cmd.env(env_var, value);
126            }
127
128            cmd.status().context(
129                "Failed to execute psql. Is PostgreSQL client installed?\n\
130                 Install with:\n\
131                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
132                 - macOS: brew install postgresql\n\
133                 - RHEL/CentOS: sudo yum install postgresql",
134            )
135        },
136        3,                      // Max 3 retries
137        Duration::from_secs(1), // Start with 1 second delay
138        "psql (restore schema)",
139    )
140    .await
141    .context(
142        "Schema restoration failed.\n\
143         \n\
144         Common causes:\n\
145         - Target database does not exist\n\
146         - User lacks CREATE privileges on target\n\
147         - Schema objects already exist (try dropping them first)\n\
148         - Version incompatibility between source and target\n\
149         - Syntax errors in dump file\n\
150         - Connection timeout or network issues",
151    )?;
152
153    tracing::info!("✓ Schema restored successfully");
154    Ok(())
155}
156
157/// Restore data using pg_restore with parallel jobs
158///
159/// Uses PostgreSQL directory format restore with:
160/// - Parallel restore for faster performance
161/// - Automatic decompression of compressed dump files
162/// - Optimized for directory format dumps created by dump_data()
163///
164/// The number of parallel jobs is automatically determined based on available CPU cores.
165pub async fn restore_data(target_url: &str, input_path: &str) -> Result<()> {
166    // Determine optimal number of parallel jobs (number of CPUs, capped at 8)
167    let num_cpus = std::thread::available_parallelism()
168        .map(|n| n.get().min(8))
169        .unwrap_or(4);
170
171    tracing::info!(
172        "Restoring data from {} (parallel={}, format=directory)",
173        input_path,
174        num_cpus
175    );
176
177    // Parse URL and create .pgpass file for secure authentication
178    let parts = crate::utils::parse_postgres_url(target_url)
179        .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
180    let pgpass = crate::utils::PgPassFile::new(&parts)
181        .context("Failed to create .pgpass file for authentication")?;
182
183    let env_vars = parts.to_pg_env_vars();
184    let input_path_owned = input_path.to_string();
185
186    // Wrap subprocess execution with retry logic
187    crate::utils::retry_subprocess_with_backoff(
188        || {
189            let mut cmd = Command::new("pg_restore");
190            cmd.arg("--data-only")
191                .arg("--no-owner")
192                .arg(format!("--jobs={}", num_cpus)) // Parallel restore jobs
193                .arg("--host")
194                .arg(&parts.host)
195                .arg("--port")
196                .arg(parts.port.to_string())
197                .arg("--dbname")
198                .arg(&parts.database)
199                .arg("--format=directory") // Directory format
200                .arg("--verbose") // Show progress
201                .arg(&input_path_owned)
202                .env("PGPASSFILE", pgpass.path())
203                .stdout(Stdio::inherit())
204                .stderr(Stdio::inherit());
205
206            // Add username if specified
207            if let Some(user) = &parts.user {
208                cmd.arg("--username").arg(user);
209            }
210
211            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
212            for (env_var, value) in &env_vars {
213                cmd.env(env_var, value);
214            }
215
216            // Apply TCP keepalive parameters to prevent idle connection timeouts
217            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
218                cmd.env(env_var, value);
219            }
220
221            cmd.status().context(
222                "Failed to execute pg_restore. Is PostgreSQL client installed?\n\
223                 Install with:\n\
224                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
225                 - macOS: brew install postgresql\n\
226                 - RHEL/CentOS: sudo yum install postgresql",
227            )
228        },
229        3,                      // Max 3 retries
230        Duration::from_secs(1), // Start with 1 second delay
231        "pg_restore (restore data)",
232    )
233    .await
234    .context(
235        "Data restoration failed.\n\
236         \n\
237         Common causes:\n\
238         - Foreign key constraint violations\n\
239         - Unique constraint violations (data already exists)\n\
240         - User lacks INSERT privileges on target tables\n\
241         - Disk space issues on target\n\
242         - Data type mismatches\n\
243         - Input directory is not a valid pg_dump directory format\n\
244         - Connection timeout or network issues",
245    )?;
246
247    tracing::info!(
248        "✓ Data restored successfully using {} parallel jobs",
249        num_cpus
250    );
251    Ok(())
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257    use crate::migration::dump;
258    use tempfile::tempdir;
259
260    #[tokio::test]
261    #[ignore]
262    async fn test_restore_globals() {
263        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
264        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
265
266        let dir = tempdir().unwrap();
267        let dump_file = dir.path().join("globals.sql");
268
269        // Dump from source
270        dump::dump_globals(&source_url, dump_file.to_str().unwrap())
271            .await
272            .unwrap();
273
274        // Restore to target
275        let result = restore_globals(&target_url, dump_file.to_str().unwrap()).await;
276        assert!(result.is_ok());
277    }
278}