database_replicator/migration/
restore.rs

1// ABOUTME: Wrapper for psql and pg_restore to import database objects
2// ABOUTME: Restores global objects, schema, and data to target
3
4use anyhow::{Context, Result};
5use std::process::{Command, Stdio};
6use std::time::Duration;
7
8/// Restore global objects using psql
9pub async fn restore_globals(target_url: &str, input_path: &str) -> Result<()> {
10    tracing::info!("Restoring global objects from {}", input_path);
11
12    // Parse URL and create .pgpass file for secure authentication
13    let parts = crate::utils::parse_postgres_url(target_url)
14        .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
15    let pgpass = crate::utils::PgPassFile::new(&parts)
16        .context("Failed to create .pgpass file for authentication")?;
17
18    let env_vars = parts.to_pg_env_vars();
19    let input_path_owned = input_path.to_string();
20
21    // Wrap subprocess execution with retry logic
22    let result = crate::utils::retry_subprocess_with_backoff(
23        || {
24            let mut cmd = Command::new("psql");
25            cmd.arg("--host")
26                .arg(&parts.host)
27                .arg("--port")
28                .arg(parts.port.to_string())
29                .arg("--dbname")
30                .arg(&parts.database)
31                .arg(format!("--file={}", input_path_owned))
32                .arg("--quiet")
33                .arg("-v")
34                .arg("ON_ERROR_STOP=1") // Stop on first error for better visibility
35                .env("PGPASSFILE", pgpass.path())
36                .stdout(Stdio::inherit())
37                .stderr(Stdio::inherit());
38
39            // Add username if specified
40            if let Some(user) = &parts.user {
41                cmd.arg("--username").arg(user);
42            }
43
44            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
45            for (env_var, value) in &env_vars {
46                cmd.env(env_var, value);
47            }
48
49            // Apply TCP keepalive parameters to prevent idle connection timeouts
50            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
51                cmd.env(env_var, value);
52            }
53
54            cmd.status().context(
55                "Failed to execute psql. Is PostgreSQL client installed?\n\
56                 Install with:\n\
57                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
58                 - macOS: brew install postgresql\n\
59                 - RHEL/CentOS: sudo yum install postgresql",
60            )
61        },
62        3,                      // Max 3 retries
63        Duration::from_secs(1), // Start with 1 second delay
64        "psql (restore globals)",
65    );
66
67    // Handle result - don't fail on warnings for global objects
68    match result {
69        Ok(()) => {
70            tracing::info!("✓ Global objects restored");
71            Ok(())
72        }
73        Err(e) => {
74            tracing::warn!("⚠ Some global object restoration warnings occurred: {}", e);
75            // Don't fail - some errors are expected (roles may already exist)
76            Ok(())
77        }
78    }
79}
80
81/// Restore schema using psql
82pub async fn restore_schema(target_url: &str, input_path: &str) -> Result<()> {
83    tracing::info!("Restoring schema from {}", input_path);
84
85    // Parse URL and create .pgpass file for secure authentication
86    let parts = crate::utils::parse_postgres_url(target_url)
87        .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
88    let pgpass = crate::utils::PgPassFile::new(&parts)
89        .context("Failed to create .pgpass file for authentication")?;
90
91    let env_vars = parts.to_pg_env_vars();
92    let input_path_owned = input_path.to_string();
93
94    // Wrap subprocess execution with retry logic
95    crate::utils::retry_subprocess_with_backoff(
96        || {
97            let mut cmd = Command::new("psql");
98            cmd.arg("--host")
99                .arg(&parts.host)
100                .arg("--port")
101                .arg(parts.port.to_string())
102                .arg("--dbname")
103                .arg(&parts.database)
104                .arg(format!("--file={}", input_path_owned))
105                .arg("--quiet")
106                .arg("-v")
107                .arg("ON_ERROR_STOP=1") // Stop on first error for better visibility
108                .env("PGPASSFILE", pgpass.path())
109                .stdout(Stdio::inherit())
110                .stderr(Stdio::inherit());
111
112            // Add username if specified
113            if let Some(user) = &parts.user {
114                cmd.arg("--username").arg(user);
115            }
116
117            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
118            for (env_var, value) in &env_vars {
119                cmd.env(env_var, value);
120            }
121
122            // Apply TCP keepalive parameters to prevent idle connection timeouts
123            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
124                cmd.env(env_var, value);
125            }
126
127            cmd.status().context(
128                "Failed to execute psql. Is PostgreSQL client installed?\n\
129                 Install with:\n\
130                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
131                 - macOS: brew install postgresql\n\
132                 - RHEL/CentOS: sudo yum install postgresql",
133            )
134        },
135        3,                      // Max 3 retries
136        Duration::from_secs(1), // Start with 1 second delay
137        "psql (restore schema)",
138    )
139    .context(
140        "Schema restoration failed.\n\
141         \n\
142         Common causes:\n\
143         - Target database does not exist\n\
144         - User lacks CREATE privileges on target\n\
145         - Schema objects already exist (try dropping them first)\n\
146         - Version incompatibility between source and target\n\
147         - Syntax errors in dump file\n\
148         - Connection timeout or network issues",
149    )?;
150
151    tracing::info!("✓ Schema restored successfully");
152    Ok(())
153}
154
155/// Restore data using pg_restore with parallel jobs
156///
157/// Uses PostgreSQL directory format restore with:
158/// - Parallel restore for faster performance
159/// - Automatic decompression of compressed dump files
160/// - Optimized for directory format dumps created by dump_data()
161///
162/// The number of parallel jobs is automatically determined based on available CPU cores.
163pub async fn restore_data(target_url: &str, input_path: &str) -> Result<()> {
164    // Determine optimal number of parallel jobs (number of CPUs, capped at 8)
165    let num_cpus = std::thread::available_parallelism()
166        .map(|n| n.get().min(8))
167        .unwrap_or(4);
168
169    tracing::info!(
170        "Restoring data from {} (parallel={}, format=directory)",
171        input_path,
172        num_cpus
173    );
174
175    // Parse URL and create .pgpass file for secure authentication
176    let parts = crate::utils::parse_postgres_url(target_url)
177        .with_context(|| format!("Failed to parse target URL: {}", target_url))?;
178    let pgpass = crate::utils::PgPassFile::new(&parts)
179        .context("Failed to create .pgpass file for authentication")?;
180
181    let env_vars = parts.to_pg_env_vars();
182    let input_path_owned = input_path.to_string();
183
184    // Wrap subprocess execution with retry logic
185    crate::utils::retry_subprocess_with_backoff(
186        || {
187            let mut cmd = Command::new("pg_restore");
188            cmd.arg("--data-only")
189                .arg("--no-owner")
190                .arg(format!("--jobs={}", num_cpus)) // Parallel restore jobs
191                .arg("--host")
192                .arg(&parts.host)
193                .arg("--port")
194                .arg(parts.port.to_string())
195                .arg("--dbname")
196                .arg(&parts.database)
197                .arg("--format=directory") // Directory format
198                .arg("--verbose") // Show progress
199                .arg(&input_path_owned)
200                .env("PGPASSFILE", pgpass.path())
201                .stdout(Stdio::inherit())
202                .stderr(Stdio::inherit());
203
204            // Add username if specified
205            if let Some(user) = &parts.user {
206                cmd.arg("--username").arg(user);
207            }
208
209            // Apply query parameters as environment variables (SSL, channel_binding, etc.)
210            for (env_var, value) in &env_vars {
211                cmd.env(env_var, value);
212            }
213
214            // Apply TCP keepalive parameters to prevent idle connection timeouts
215            for (env_var, value) in crate::utils::get_keepalive_env_vars() {
216                cmd.env(env_var, value);
217            }
218
219            cmd.status().context(
220                "Failed to execute pg_restore. Is PostgreSQL client installed?\n\
221                 Install with:\n\
222                 - Ubuntu/Debian: sudo apt-get install postgresql-client\n\
223                 - macOS: brew install postgresql\n\
224                 - RHEL/CentOS: sudo yum install postgresql",
225            )
226        },
227        3,                      // Max 3 retries
228        Duration::from_secs(1), // Start with 1 second delay
229        "pg_restore (restore data)",
230    )
231    .context(
232        "Data restoration failed.\n\
233         \n\
234         Common causes:\n\
235         - Foreign key constraint violations\n\
236         - Unique constraint violations (data already exists)\n\
237         - User lacks INSERT privileges on target tables\n\
238         - Disk space issues on target\n\
239         - Data type mismatches\n\
240         - Input directory is not a valid pg_dump directory format\n\
241         - Connection timeout or network issues",
242    )?;
243
244    tracing::info!(
245        "✓ Data restored successfully using {} parallel jobs",
246        num_cpus
247    );
248    Ok(())
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254    use crate::migration::dump;
255    use tempfile::tempdir;
256
257    #[tokio::test]
258    #[ignore]
259    async fn test_restore_globals() {
260        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
261        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
262
263        let dir = tempdir().unwrap();
264        let dump_file = dir.path().join("globals.sql");
265
266        // Dump from source
267        dump::dump_globals(&source_url, dump_file.to_str().unwrap())
268            .await
269            .unwrap();
270
271        // Restore to target
272        let result = restore_globals(&target_url, dump_file.to_str().unwrap()).await;
273        assert!(result.is_ok());
274    }
275}