codex_memory/backup/
backup_manager.rs

1use super::repository::{BackupRepository, PostgresBackupRepository};
2use super::{BackupConfig, BackupError, BackupMetadata, BackupStatus, BackupType, Result};
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::path::Path;
6use std::process::Command;
7use std::sync::Arc;
8use tokio::fs;
9use tracing::{debug, error, info, warn};
10use uuid::Uuid;
11
12/// Main backup manager responsible for orchestrating all backup operations
13pub struct BackupManager {
14    config: BackupConfig,
15    repository: Arc<dyn BackupRepository>,
16}
17
18impl BackupManager {
19    pub fn new(config: BackupConfig, db_pool: Arc<sqlx::PgPool>) -> Self {
20        let repository = Arc::new(PostgresBackupRepository::new(db_pool));
21        Self { config, repository }
22    }
23
24    pub fn with_repository(config: BackupConfig, repository: Arc<dyn BackupRepository>) -> Self {
25        Self { config, repository }
26    }
27
28    /// Initialize the backup manager and create necessary directories
29    pub async fn initialize(&self) -> Result<()> {
30        info!("Initializing backup manager");
31
32        // Create backup directories
33        fs::create_dir_all(&self.config.backup_directory).await?;
34        fs::create_dir_all(&self.config.wal_archive_directory).await?;
35
36        // Initialize repository
37        self.repository.initialize().await?;
38
39        // Verify PostgreSQL configuration
40        self.repository.verify_postgres_config().await?;
41
42        info!("Backup manager initialized successfully");
43        Ok(())
44    }
45
46    /// Perform a full database backup
47    pub async fn create_full_backup(&self) -> Result<BackupMetadata> {
48        let backup_id = Uuid::new_v4().to_string();
49        let start_time = Utc::now();
50
51        info!("Starting full backup: {}", backup_id);
52
53        let backup_filename = format!(
54            "full_backup_{}_{}.sql.gz",
55            start_time.format("%Y%m%d_%H%M%S"),
56            backup_id
57        );
58        let backup_path = self.config.backup_directory.join(&backup_filename);
59
60        // Check available disk space before starting backup
61        self.check_disk_space(&backup_path).await?;
62
63        let mut metadata = BackupMetadata {
64            id: backup_id.clone(),
65            backup_type: BackupType::Full,
66            status: BackupStatus::InProgress,
67            start_time,
68            end_time: None,
69            size_bytes: 0,
70            compressed_size_bytes: 0,
71            file_path: backup_path.clone(),
72            checksum: String::new(),
73            database_name: self.extract_database_name()?,
74            wal_start_lsn: None,
75            wal_end_lsn: None,
76            encryption_enabled: self.config.enable_encryption,
77            replication_status: std::collections::HashMap::new(),
78            verification_status: None,
79        };
80
81        // Store initial metadata
82        self.repository.store_metadata(&metadata).await?;
83
84        // Get WAL start LSN
85        let start_lsn = self.repository.get_current_wal_lsn().await?;
86        metadata.wal_start_lsn = Some(start_lsn);
87
88        // Perform the backup using pg_dump
89        match self
90            .execute_pg_dump(&backup_path, &metadata.database_name)
91            .await
92        {
93            Ok(_) => {
94                let end_time = Utc::now();
95                metadata.end_time = Some(end_time);
96                metadata.status = BackupStatus::Completed;
97
98                // Get WAL end LSN
99                let end_lsn = self.repository.get_current_wal_lsn().await?;
100                metadata.wal_end_lsn = Some(end_lsn);
101
102                // Calculate file sizes and checksum
103                let file_metadata = fs::metadata(&backup_path).await?;
104                metadata.compressed_size_bytes = file_metadata.len();
105                metadata.checksum = self.calculate_file_checksum(&backup_path).await?;
106
107                // Update metadata
108                self.repository.update_metadata(&metadata).await?;
109
110                info!(
111                    "Full backup completed successfully: {} ({} bytes)",
112                    backup_id, metadata.compressed_size_bytes
113                );
114
115                // Trigger replication if enabled
116                if self.config.enable_replication {
117                    self.replicate_backup(&metadata).await?;
118                }
119
120                Ok(metadata)
121            }
122            Err(e) => {
123                metadata.status = BackupStatus::Failed;
124                metadata.end_time = Some(Utc::now());
125                self.repository.update_metadata(&metadata).await?;
126
127                error!("Full backup failed: {}", e);
128                Err(e)
129            }
130        }
131    }
132
133    /// Create an incremental backup (WAL archives since last backup)
134    pub async fn create_incremental_backup(&self) -> Result<BackupMetadata> {
135        let backup_id = Uuid::new_v4().to_string();
136        let start_time = Utc::now();
137
138        info!("Starting incremental backup: {}", backup_id);
139
140        // Get the last backup to determine starting point
141        let last_backup = self.repository.get_latest_backup().await?;
142        let start_lsn = match last_backup {
143            Some(backup) => {
144                if let Some(lsn) = backup.wal_end_lsn {
145                    lsn
146                } else {
147                    warn!("Last backup has no end LSN, using current LSN");
148                    self.repository
149                        .get_current_wal_lsn()
150                        .await
151                        .unwrap_or_default()
152                }
153            }
154            None => {
155                return Err(BackupError::BackupFailed {
156                    message: "No previous backup found for incremental backup".to_string(),
157                });
158            }
159        };
160
161        let backup_filename = format!(
162            "incremental_backup_{}_{}.tar.gz",
163            start_time.format("%Y%m%d_%H%M%S"),
164            backup_id
165        );
166        let backup_path = self.config.backup_directory.join(&backup_filename);
167
168        let mut metadata = BackupMetadata {
169            id: backup_id.clone(),
170            backup_type: BackupType::Incremental,
171            status: BackupStatus::InProgress,
172            start_time,
173            end_time: None,
174            size_bytes: 0,
175            compressed_size_bytes: 0,
176            file_path: backup_path.clone(),
177            checksum: String::new(),
178            database_name: self.extract_database_name()?,
179            wal_start_lsn: Some(start_lsn.clone()),
180            wal_end_lsn: None,
181            encryption_enabled: self.config.enable_encryption,
182            replication_status: std::collections::HashMap::new(),
183            verification_status: None,
184        };
185
186        // Store initial metadata
187        self.repository.store_metadata(&metadata).await?;
188
189        // Create incremental backup by archiving WAL files
190        match self.archive_wal_files(&start_lsn, &backup_path).await {
191            Ok(end_lsn) => {
192                let end_time = Utc::now();
193                metadata.end_time = Some(end_time);
194                metadata.status = BackupStatus::Completed;
195                metadata.wal_end_lsn = Some(end_lsn);
196
197                // Calculate file sizes and checksum
198                if backup_path.exists() {
199                    let file_metadata = fs::metadata(&backup_path).await?;
200                    metadata.compressed_size_bytes = file_metadata.len();
201                    metadata.checksum = self.calculate_file_checksum(&backup_path).await?;
202                } else {
203                    warn!("No new WAL files to archive since last backup");
204                }
205
206                // Update metadata
207                self.repository.update_metadata(&metadata).await?;
208
209                info!("Incremental backup completed successfully: {}", backup_id);
210                Ok(metadata)
211            }
212            Err(e) => {
213                metadata.status = BackupStatus::Failed;
214                metadata.end_time = Some(Utc::now());
215                self.repository.update_metadata(&metadata).await?;
216
217                error!("Incremental backup failed: {}", e);
218                Err(e)
219            }
220        }
221    }
222
223    /// Clean up expired backups based on retention policy
224    pub async fn cleanup_expired_backups(&self) -> Result<u32> {
225        info!(
226            "Starting backup cleanup with retention policy of {} days",
227            self.config.retention_days
228        );
229
230        let expired_backups = self
231            .repository
232            .get_expired_backups(self.config.retention_days)
233            .await?;
234        let mut cleanup_count = 0;
235
236        for backup in expired_backups {
237            match self.delete_backup(&backup).await {
238                Ok(_) => {
239                    cleanup_count += 1;
240                    info!("Deleted expired backup: {}", backup.id);
241                }
242                Err(e) => {
243                    error!("Failed to delete expired backup {}: {}", backup.id, e);
244                }
245            }
246        }
247
248        info!("Cleanup completed: {} backups deleted", cleanup_count);
249        Ok(cleanup_count)
250    }
251
252    /// Get backup statistics and health metrics
253    pub async fn get_backup_statistics(&self) -> Result<BackupStatistics> {
254        let total_backups = self.repository.count_backups().await?;
255        let recent_backups = self.repository.get_recent_backups(7).await?;
256
257        let successful_backups = recent_backups
258            .iter()
259            .filter(|b| b.status == BackupStatus::Completed)
260            .count();
261
262        let failed_backups = recent_backups
263            .iter()
264            .filter(|b| b.status == BackupStatus::Failed)
265            .count();
266
267        let total_backup_size = recent_backups
268            .iter()
269            .filter(|b| b.status == BackupStatus::Completed)
270            .map(|b| b.compressed_size_bytes)
271            .sum();
272
273        let latest_backup = self.repository.get_latest_backup().await?;
274        let backup_frequency_met = if let Some(latest) = &latest_backup {
275            let hours_since_last = Utc::now()
276                .signed_duration_since(latest.start_time)
277                .num_hours();
278            hours_since_last <= 24 // Should have at least one backup per day
279        } else {
280            false
281        };
282
283        Ok(BackupStatistics {
284            total_backups,
285            successful_backups_last_7_days: successful_backups as u32,
286            failed_backups_last_7_days: failed_backups as u32,
287            total_backup_size_bytes: total_backup_size,
288            latest_backup_time: latest_backup.map(|b| b.start_time),
289            backup_frequency_met,
290            rto_target_minutes: self.config.rto_minutes,
291            rpo_target_minutes: self.config.rpo_minutes,
292        })
293    }
294
295    // Private helper methods
296
297    // Removed - now handled by repository
298
299    fn extract_database_name(&self) -> Result<String> {
300        // Extract database name from connection string
301        // This is a simplified implementation
302        Ok("codex_memory".to_string())
303    }
304
305    // Removed - now handled by repository
306
307    /// Retry database operations with exponential backoff
308    async fn retry_database_operation<T, F, Fut>(&self, operation: F) -> Result<T>
309    where
310        F: Fn() -> Fut,
311        Fut: std::future::Future<Output = Result<T>>,
312    {
313        let mut retries = 0;
314        let max_retries = 3;
315
316        loop {
317            match operation().await {
318                Ok(result) => return Ok(result),
319                Err(e) => {
320                    if retries >= max_retries {
321                        return Err(e);
322                    }
323
324                    retries += 1;
325                    let delay = std::time::Duration::from_millis(100 * (1 << retries));
326                    warn!(
327                        "Database operation failed (attempt {}), retrying in {:?}: {}",
328                        retries, delay, e
329                    );
330                    tokio::time::sleep(delay).await;
331                }
332            }
333        }
334    }
335
336    /// Check if there's sufficient disk space for the backup operation
337    async fn check_disk_space(&self, backup_path: &Path) -> Result<()> {
338        let backup_dir = backup_path.parent().unwrap_or_else(|| Path::new("/"));
339
340        // Check if the directory is writable and has some space
341        let test_file = backup_dir.join(".backup_space_test");
342        match fs::write(&test_file, b"test").await {
343            Ok(_) => {
344                let _ = fs::remove_file(&test_file).await;
345                debug!("Disk space check passed for {}", backup_dir.display());
346                Ok(())
347            }
348            Err(e) => {
349                error!("Insufficient disk space or permissions for backup: {}", e);
350                Err(BackupError::BackupFailed {
351                    message: format!("Cannot write to backup directory: {e}"),
352                })
353            }
354        }
355    }
356
357    async fn execute_pg_dump(&self, backup_path: &Path, database_name: &str) -> Result<()> {
358        debug!("Executing pg_dump to {}", backup_path.display());
359
360        let mut cmd = Command::new("pg_dump");
361        cmd.arg("--verbose")
362            .arg("--format=custom")
363            .arg("--compress=9")
364            .arg("--no-privileges")
365            .arg("--no-owner")
366            .arg("--dbname")
367            .arg(database_name)
368            .arg("--file")
369            .arg(backup_path);
370
371        // Add connection parameters from database URL
372        // This would need to parse the actual DATABASE_URL in production
373        cmd.arg("--host")
374            .arg("localhost")
375            .arg("--port")
376            .arg("5432")
377            .arg("--username")
378            .arg("postgres");
379
380        let output = cmd.output().map_err(|e| BackupError::BackupFailed {
381            message: format!("Failed to execute pg_dump: {e}"),
382        })?;
383
384        if !output.status.success() {
385            let error_msg = String::from_utf8_lossy(&output.stderr);
386            return Err(BackupError::BackupFailed {
387                message: format!("pg_dump failed: {error_msg}"),
388            });
389        }
390
391        debug!("pg_dump completed successfully");
392        Ok(())
393    }
394
395    async fn archive_wal_files(&self, start_lsn: &str, backup_path: &Path) -> Result<String> {
396        debug!("Archiving WAL files from LSN: {}", start_lsn);
397
398        // This is a simplified implementation
399        // In production, this would copy WAL files from pg_wal directory
400        let current_lsn = self.repository.get_current_wal_lsn().await?;
401
402        // Create empty archive file as placeholder
403        tokio::fs::write(backup_path, b"").await?;
404
405        Ok(current_lsn)
406    }
407
408    async fn calculate_file_checksum(&self, file_path: &Path) -> Result<String> {
409        use sha2::{Digest, Sha256};
410
411        let contents = fs::read(file_path).await?;
412        let mut hasher = Sha256::new();
413        hasher.update(&contents);
414        let result = hasher.finalize();
415        Ok(format!("{result:x}"))
416    }
417
418    async fn replicate_backup(&self, metadata: &BackupMetadata) -> Result<()> {
419        debug!("Replicating backup: {}", metadata.id);
420
421        for target in &self.config.replication_targets {
422            info!("Replicating to target: {}", target.name);
423            // This would implement actual replication logic
424            // For now, just log the operation
425        }
426
427        Ok(())
428    }
429
430    async fn delete_backup(&self, backup: &BackupMetadata) -> Result<()> {
431        debug!("Deleting backup: {}", backup.id);
432
433        // Delete the backup file
434        if backup.file_path.exists() {
435            fs::remove_file(&backup.file_path).await?;
436        }
437
438        // Mark as expired in metadata store
439        self.repository.mark_backup_expired(&backup.id).await?;
440
441        Ok(())
442    }
443}
444
445// Backup metadata store moved to repository module for better layer separation
446
447#[derive(Debug, Clone, Serialize, Deserialize)]
448pub struct BackupStatistics {
449    pub total_backups: u32,
450    pub successful_backups_last_7_days: u32,
451    pub failed_backups_last_7_days: u32,
452    pub total_backup_size_bytes: u64,
453    pub latest_backup_time: Option<DateTime<Utc>>,
454    pub backup_frequency_met: bool,
455    pub rto_target_minutes: u32,
456    pub rpo_target_minutes: u32,
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462
463    #[test]
464    fn test_backup_config_default() {
465        let config = BackupConfig::default();
466        assert_eq!(config.retention_days, 30);
467        assert_eq!(config.rto_minutes, 60);
468        assert_eq!(config.rpo_minutes, 5);
469        assert!(config.enable_encryption);
470    }
471
472    #[test]
473    fn test_backup_metadata_creation() {
474        let metadata = BackupMetadata {
475            id: "test-backup".to_string(),
476            backup_type: BackupType::Full,
477            status: BackupStatus::InProgress,
478            start_time: Utc::now(),
479            end_time: None,
480            size_bytes: 0,
481            compressed_size_bytes: 0,
482            file_path: std::path::PathBuf::from("/tmp/test.sql"),
483            checksum: String::new(),
484            database_name: "test_db".to_string(),
485            wal_start_lsn: None,
486            wal_end_lsn: None,
487            encryption_enabled: true,
488            replication_status: std::collections::HashMap::new(),
489            verification_status: None,
490        };
491
492        assert_eq!(metadata.id, "test-backup");
493        assert!(matches!(metadata.backup_type, BackupType::Full));
494        assert!(metadata.encryption_enabled);
495    }
496}