1use super::{BackupConfig, BackupError, BackupMetadata, BackupStatus, BackupType, Result};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use sqlx::PgPool;
5use std::path::Path;
6use std::process::Command;
7use std::sync::Arc;
8use tokio::fs;
9use tracing::{debug, error, info, warn};
10use uuid::Uuid;
11
12pub struct BackupManager {
14    config: BackupConfig,
15    db_pool: Arc<PgPool>,
16    metadata_store: BackupMetadataStore,
17}
18
19impl BackupManager {
20    pub fn new(config: BackupConfig, db_pool: Arc<PgPool>) -> Self {
21        let metadata_store = BackupMetadataStore::new(db_pool.clone());
22        Self {
23            config,
24            db_pool,
25            metadata_store,
26        }
27    }
28
29    pub async fn initialize(&self) -> Result<()> {
31        info!("Initializing backup manager");
32
33        fs::create_dir_all(&self.config.backup_directory).await?;
35        fs::create_dir_all(&self.config.wal_archive_directory).await?;
36
37        self.metadata_store.initialize().await?;
39
40        self.verify_postgres_config().await?;
42
43        info!("Backup manager initialized successfully");
44        Ok(())
45    }
46
47    pub async fn create_full_backup(&self) -> Result<BackupMetadata> {
49        let backup_id = Uuid::new_v4().to_string();
50        let start_time = Utc::now();
51
52        info!("Starting full backup: {}", backup_id);
53
54        let backup_filename = format!(
55            "full_backup_{}_{}.sql.gz",
56            start_time.format("%Y%m%d_%H%M%S"),
57            backup_id
58        );
59        let backup_path = self.config.backup_directory.join(&backup_filename);
60
61        self.check_disk_space(&backup_path).await?;
63
64        let mut metadata = BackupMetadata {
65            id: backup_id.clone(),
66            backup_type: BackupType::Full,
67            status: BackupStatus::InProgress,
68            start_time,
69            end_time: None,
70            size_bytes: 0,
71            compressed_size_bytes: 0,
72            file_path: backup_path.clone(),
73            checksum: String::new(),
74            database_name: self.extract_database_name()?,
75            wal_start_lsn: None,
76            wal_end_lsn: None,
77            encryption_enabled: self.config.enable_encryption,
78            replication_status: std::collections::HashMap::new(),
79            verification_status: None,
80        };
81
82        self.metadata_store.store_metadata(&metadata).await?;
84
85        let start_lsn = self.get_current_wal_lsn().await?;
87        metadata.wal_start_lsn = Some(start_lsn);
88
89        match self
91            .execute_pg_dump(&backup_path, &metadata.database_name)
92            .await
93        {
94            Ok(_) => {
95                let end_time = Utc::now();
96                metadata.end_time = Some(end_time);
97                metadata.status = BackupStatus::Completed;
98
99                let end_lsn = self.get_current_wal_lsn().await?;
101                metadata.wal_end_lsn = Some(end_lsn);
102
103                let file_metadata = fs::metadata(&backup_path).await?;
105                metadata.compressed_size_bytes = file_metadata.len();
106                metadata.checksum = self.calculate_file_checksum(&backup_path).await?;
107
108                self.metadata_store.update_metadata(&metadata).await?;
110
111                info!(
112                    "Full backup completed successfully: {} ({} bytes)",
113                    backup_id, metadata.compressed_size_bytes
114                );
115
116                if self.config.enable_replication {
118                    self.replicate_backup(&metadata).await?;
119                }
120
121                Ok(metadata)
122            }
123            Err(e) => {
124                metadata.status = BackupStatus::Failed;
125                metadata.end_time = Some(Utc::now());
126                self.metadata_store.update_metadata(&metadata).await?;
127
128                error!("Full backup failed: {}", e);
129                Err(e)
130            }
131        }
132    }
133
134    pub async fn create_incremental_backup(&self) -> Result<BackupMetadata> {
136        let backup_id = Uuid::new_v4().to_string();
137        let start_time = Utc::now();
138
139        info!("Starting incremental backup: {}", backup_id);
140
141        let last_backup = self.metadata_store.get_latest_backup().await?;
143        let start_lsn = match last_backup {
144            Some(backup) => {
145                if let Some(lsn) = backup.wal_end_lsn {
146                    lsn
147                } else {
148                    warn!("Last backup has no end LSN, using current LSN");
149                    self.get_current_wal_lsn().await.unwrap_or_default()
150                }
151            }
152            None => {
153                return Err(BackupError::BackupFailed {
154                    message: "No previous backup found for incremental backup".to_string(),
155                });
156            }
157        };
158
159        let backup_filename = format!(
160            "incremental_backup_{}_{}.tar.gz",
161            start_time.format("%Y%m%d_%H%M%S"),
162            backup_id
163        );
164        let backup_path = self.config.backup_directory.join(&backup_filename);
165
166        let mut metadata = BackupMetadata {
167            id: backup_id.clone(),
168            backup_type: BackupType::Incremental,
169            status: BackupStatus::InProgress,
170            start_time,
171            end_time: None,
172            size_bytes: 0,
173            compressed_size_bytes: 0,
174            file_path: backup_path.clone(),
175            checksum: String::new(),
176            database_name: self.extract_database_name()?,
177            wal_start_lsn: Some(start_lsn.clone()),
178            wal_end_lsn: None,
179            encryption_enabled: self.config.enable_encryption,
180            replication_status: std::collections::HashMap::new(),
181            verification_status: None,
182        };
183
184        self.metadata_store.store_metadata(&metadata).await?;
186
187        match self.archive_wal_files(&start_lsn, &backup_path).await {
189            Ok(end_lsn) => {
190                let end_time = Utc::now();
191                metadata.end_time = Some(end_time);
192                metadata.status = BackupStatus::Completed;
193                metadata.wal_end_lsn = Some(end_lsn);
194
195                if backup_path.exists() {
197                    let file_metadata = fs::metadata(&backup_path).await?;
198                    metadata.compressed_size_bytes = file_metadata.len();
199                    metadata.checksum = self.calculate_file_checksum(&backup_path).await?;
200                } else {
201                    warn!("No new WAL files to archive since last backup");
202                }
203
204                self.metadata_store.update_metadata(&metadata).await?;
206
207                info!("Incremental backup completed successfully: {}", backup_id);
208                Ok(metadata)
209            }
210            Err(e) => {
211                metadata.status = BackupStatus::Failed;
212                metadata.end_time = Some(Utc::now());
213                self.metadata_store.update_metadata(&metadata).await?;
214
215                error!("Incremental backup failed: {}", e);
216                Err(e)
217            }
218        }
219    }
220
221    pub async fn cleanup_expired_backups(&self) -> Result<u32> {
223        info!(
224            "Starting backup cleanup with retention policy of {} days",
225            self.config.retention_days
226        );
227
228        let expired_backups = self
229            .metadata_store
230            .get_expired_backups(self.config.retention_days)
231            .await?;
232        let mut cleanup_count = 0;
233
234        for backup in expired_backups {
235            match self.delete_backup(&backup).await {
236                Ok(_) => {
237                    cleanup_count += 1;
238                    info!("Deleted expired backup: {}", backup.id);
239                }
240                Err(e) => {
241                    error!("Failed to delete expired backup {}: {}", backup.id, e);
242                }
243            }
244        }
245
246        info!("Cleanup completed: {} backups deleted", cleanup_count);
247        Ok(cleanup_count)
248    }
249
250    pub async fn get_backup_statistics(&self) -> Result<BackupStatistics> {
252        let total_backups = self.metadata_store.count_backups().await?;
253        let recent_backups = self.metadata_store.get_recent_backups(7).await?;
254
255        let successful_backups = recent_backups
256            .iter()
257            .filter(|b| b.status == BackupStatus::Completed)
258            .count();
259
260        let failed_backups = recent_backups
261            .iter()
262            .filter(|b| b.status == BackupStatus::Failed)
263            .count();
264
265        let total_backup_size = recent_backups
266            .iter()
267            .filter(|b| b.status == BackupStatus::Completed)
268            .map(|b| b.compressed_size_bytes)
269            .sum();
270
271        let latest_backup = self.metadata_store.get_latest_backup().await?;
272        let backup_frequency_met = if let Some(latest) = &latest_backup {
273            let hours_since_last = Utc::now()
274                .signed_duration_since(latest.start_time)
275                .num_hours();
276            hours_since_last <= 24 } else {
278            false
279        };
280
281        Ok(BackupStatistics {
282            total_backups,
283            successful_backups_last_7_days: successful_backups as u32,
284            failed_backups_last_7_days: failed_backups as u32,
285            total_backup_size_bytes: total_backup_size,
286            latest_backup_time: latest_backup.map(|b| b.start_time),
287            backup_frequency_met,
288            rto_target_minutes: self.config.rto_minutes,
289            rpo_target_minutes: self.config.rpo_minutes,
290        })
291    }
292
293    async fn verify_postgres_config(&self) -> Result<()> {
296        debug!("Verifying PostgreSQL configuration for backups");
297
298        let wal_level: String = sqlx::query_scalar("SHOW wal_level")
300            .fetch_one(self.db_pool.as_ref())
301            .await?;
302
303        if wal_level != "replica" && wal_level != "logical" {
304            return Err(BackupError::ConfigurationError {
305                message: format!("WAL level must be 'replica' or 'logical', found: {wal_level}"),
306            });
307        }
308
309        let archive_mode: String = sqlx::query_scalar("SHOW archive_mode")
311            .fetch_one(self.db_pool.as_ref())
312            .await?;
313
314        if archive_mode != "on" {
315            warn!("Archive mode is not enabled, continuous archiving will not work");
316        }
317
318        debug!("PostgreSQL configuration verified");
319        Ok(())
320    }
321
322    fn extract_database_name(&self) -> Result<String> {
323        Ok("codex_memory".to_string())
326    }
327
328    async fn get_current_wal_lsn(&self) -> Result<String> {
329        self.retry_database_operation(|| async {
330            let lsn: String = sqlx::query_scalar("SELECT pg_current_wal_lsn()")
331                .fetch_one(self.db_pool.as_ref())
332                .await?;
333            Ok(lsn)
334        })
335        .await
336    }
337
338    async fn retry_database_operation<T, F, Fut>(&self, operation: F) -> Result<T>
340    where
341        F: Fn() -> Fut,
342        Fut: std::future::Future<Output = Result<T>>,
343    {
344        let mut retries = 0;
345        let max_retries = 3;
346
347        loop {
348            match operation().await {
349                Ok(result) => return Ok(result),
350                Err(e) => {
351                    if retries >= max_retries {
352                        return Err(e);
353                    }
354
355                    retries += 1;
356                    let delay = std::time::Duration::from_millis(100 * (1 << retries));
357                    warn!(
358                        "Database operation failed (attempt {}), retrying in {:?}: {}",
359                        retries, delay, e
360                    );
361                    tokio::time::sleep(delay).await;
362                }
363            }
364        }
365    }
366
367    async fn check_disk_space(&self, backup_path: &Path) -> Result<()> {
369        let backup_dir = backup_path.parent().unwrap_or_else(|| Path::new("/"));
370
371        let test_file = backup_dir.join(".backup_space_test");
373        match fs::write(&test_file, b"test").await {
374            Ok(_) => {
375                let _ = fs::remove_file(&test_file).await;
376                debug!("Disk space check passed for {}", backup_dir.display());
377                Ok(())
378            }
379            Err(e) => {
380                error!("Insufficient disk space or permissions for backup: {}", e);
381                Err(BackupError::BackupFailed {
382                    message: format!("Cannot write to backup directory: {e}"),
383                })
384            }
385        }
386    }
387
388    async fn execute_pg_dump(&self, backup_path: &Path, database_name: &str) -> Result<()> {
389        debug!("Executing pg_dump to {}", backup_path.display());
390
391        let mut cmd = Command::new("pg_dump");
392        cmd.arg("--verbose")
393            .arg("--format=custom")
394            .arg("--compress=9")
395            .arg("--no-privileges")
396            .arg("--no-owner")
397            .arg("--dbname")
398            .arg(database_name)
399            .arg("--file")
400            .arg(backup_path);
401
402        cmd.arg("--host")
405            .arg("localhost")
406            .arg("--port")
407            .arg("5432")
408            .arg("--username")
409            .arg("postgres");
410
411        let output = cmd.output().map_err(|e| BackupError::BackupFailed {
412            message: format!("Failed to execute pg_dump: {e}"),
413        })?;
414
415        if !output.status.success() {
416            let error_msg = String::from_utf8_lossy(&output.stderr);
417            return Err(BackupError::BackupFailed {
418                message: format!("pg_dump failed: {error_msg}"),
419            });
420        }
421
422        debug!("pg_dump completed successfully");
423        Ok(())
424    }
425
426    async fn archive_wal_files(&self, start_lsn: &str, backup_path: &Path) -> Result<String> {
427        debug!("Archiving WAL files from LSN: {}", start_lsn);
428
429        let current_lsn = self.get_current_wal_lsn().await?;
432
433        tokio::fs::write(backup_path, b"").await?;
435
436        Ok(current_lsn)
437    }
438
439    async fn calculate_file_checksum(&self, file_path: &Path) -> Result<String> {
440        use sha2::{Digest, Sha256};
441
442        let contents = fs::read(file_path).await?;
443        let mut hasher = Sha256::new();
444        hasher.update(&contents);
445        let result = hasher.finalize();
446        Ok(format!("{result:x}"))
447    }
448
449    async fn replicate_backup(&self, metadata: &BackupMetadata) -> Result<()> {
450        debug!("Replicating backup: {}", metadata.id);
451
452        for target in &self.config.replication_targets {
453            info!("Replicating to target: {}", target.name);
454            }
457
458        Ok(())
459    }
460
461    async fn delete_backup(&self, backup: &BackupMetadata) -> Result<()> {
462        debug!("Deleting backup: {}", backup.id);
463
464        if backup.file_path.exists() {
466            fs::remove_file(&backup.file_path).await?;
467        }
468
469        self.metadata_store.mark_backup_expired(&backup.id).await?;
471
472        Ok(())
473    }
474}
475
476struct BackupMetadataStore {
478    db_pool: Arc<PgPool>,
479}
480
481impl BackupMetadataStore {
482    fn new(db_pool: Arc<PgPool>) -> Self {
483        Self { db_pool }
484    }
485
486    async fn initialize(&self) -> Result<()> {
487        debug!("Initializing backup metadata store");
488
489        sqlx::query(
491            r#"
492            CREATE TABLE IF NOT EXISTS backup_metadata (
493                id VARCHAR PRIMARY KEY,
494                backup_type VARCHAR NOT NULL,
495                status VARCHAR NOT NULL,
496                start_time TIMESTAMPTZ NOT NULL,
497                end_time TIMESTAMPTZ,
498                size_bytes BIGINT DEFAULT 0,
499                compressed_size_bytes BIGINT DEFAULT 0,
500                file_path TEXT NOT NULL,
501                checksum VARCHAR,
502                database_name VARCHAR NOT NULL,
503                wal_start_lsn VARCHAR,
504                wal_end_lsn VARCHAR,
505                encryption_enabled BOOLEAN DEFAULT false,
506                metadata JSONB DEFAULT '{}',
507                created_at TIMESTAMPTZ DEFAULT NOW()
508            )
509        "#,
510        )
511        .execute(self.db_pool.as_ref())
512        .await?;
513
514        debug!("Backup metadata store initialized");
515        Ok(())
516    }
517
518    async fn store_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
519        sqlx::query(
520            r#"
521            INSERT INTO backup_metadata (
522                id, backup_type, status, start_time, end_time, 
523                size_bytes, compressed_size_bytes, file_path, checksum,
524                database_name, wal_start_lsn, wal_end_lsn, encryption_enabled
525            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
526        "#,
527        )
528        .bind(&metadata.id)
529        .bind(format!("{:?}", metadata.backup_type))
530        .bind(format!("{:?}", metadata.status))
531        .bind(metadata.start_time)
532        .bind(metadata.end_time)
533        .bind(metadata.size_bytes as i64)
534        .bind(metadata.compressed_size_bytes as i64)
535        .bind(metadata.file_path.to_string_lossy().as_ref())
536        .bind(&metadata.checksum)
537        .bind(&metadata.database_name)
538        .bind(&metadata.wal_start_lsn)
539        .bind(&metadata.wal_end_lsn)
540        .bind(metadata.encryption_enabled)
541        .execute(self.db_pool.as_ref())
542        .await?;
543
544        Ok(())
545    }
546
547    async fn update_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
548        sqlx::query(
549            r#"
550            UPDATE backup_metadata SET
551                status = $2, end_time = $3, size_bytes = $4,
552                compressed_size_bytes = $5, checksum = $6,
553                wal_end_lsn = $7
554            WHERE id = $1
555        "#,
556        )
557        .bind(&metadata.id)
558        .bind(format!("{:?}", metadata.status))
559        .bind(metadata.end_time)
560        .bind(metadata.size_bytes as i64)
561        .bind(metadata.compressed_size_bytes as i64)
562        .bind(&metadata.checksum)
563        .bind(&metadata.wal_end_lsn)
564        .execute(self.db_pool.as_ref())
565        .await?;
566
567        Ok(())
568    }
569
570    async fn get_latest_backup(&self) -> Result<Option<BackupMetadata>> {
571        let row = sqlx::query(
572            r#"
573            SELECT id, backup_type, status, start_time, end_time,
574                   size_bytes, compressed_size_bytes, file_path, checksum,
575                   database_name, wal_start_lsn, wal_end_lsn, encryption_enabled
576            FROM backup_metadata 
577            WHERE status = 'Completed'
578            ORDER BY start_time DESC 
579            LIMIT 1
580        "#,
581        )
582        .fetch_optional(self.db_pool.as_ref())
583        .await?;
584
585        if let Some(row) = row {
586            Ok(Some(self.row_to_metadata(row)?))
587        } else {
588            Ok(None)
589        }
590    }
591
592    async fn get_expired_backups(&self, retention_days: u32) -> Result<Vec<BackupMetadata>> {
593        let rows = sqlx::query(
594            r#"
595            SELECT id, backup_type, status, start_time, end_time,
596                   size_bytes, compressed_size_bytes, file_path, checksum,
597                   database_name, wal_start_lsn, wal_end_lsn, encryption_enabled
598            FROM backup_metadata 
599            WHERE start_time < NOW() - INTERVAL '%d days'
600            AND status != 'Expired'
601        "#,
602        )
603        .bind(retention_days as i32)
604        .fetch_all(self.db_pool.as_ref())
605        .await?;
606
607        let mut backups = Vec::new();
608        for row in rows {
609            backups.push(self.row_to_metadata(row)?);
610        }
611
612        Ok(backups)
613    }
614
615    async fn get_recent_backups(&self, days: u32) -> Result<Vec<BackupMetadata>> {
616        let rows = sqlx::query(
617            r#"
618            SELECT id, backup_type, status, start_time, end_time,
619                   size_bytes, compressed_size_bytes, file_path, checksum,
620                   database_name, wal_start_lsn, wal_end_lsn, encryption_enabled
621            FROM backup_metadata 
622            WHERE start_time > NOW() - INTERVAL '%d days'
623            ORDER BY start_time DESC
624        "#,
625        )
626        .bind(days as i32)
627        .fetch_all(self.db_pool.as_ref())
628        .await?;
629
630        let mut backups = Vec::new();
631        for row in rows {
632            backups.push(self.row_to_metadata(row)?);
633        }
634
635        Ok(backups)
636    }
637
638    async fn count_backups(&self) -> Result<u32> {
639        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM backup_metadata")
640            .fetch_one(self.db_pool.as_ref())
641            .await?;
642
643        Ok(count as u32)
644    }
645
646    async fn mark_backup_expired(&self, backup_id: &str) -> Result<()> {
647        sqlx::query("UPDATE backup_metadata SET status = 'Expired' WHERE id = $1")
648            .bind(backup_id)
649            .execute(self.db_pool.as_ref())
650            .await?;
651
652        Ok(())
653    }
654
655    fn row_to_metadata(&self, row: sqlx::postgres::PgRow) -> Result<BackupMetadata> {
656        use sqlx::Row;
657
658        let backup_type_str: String = row.try_get("backup_type")?;
659        let backup_type = match backup_type_str.as_str() {
660            "Full" => BackupType::Full,
661            "Incremental" => BackupType::Incremental,
662            "Differential" => BackupType::Differential,
663            "WalArchive" => BackupType::WalArchive,
664            _ => BackupType::Full,
665        };
666
667        let status_str: String = row.try_get("status")?;
668        let status = match status_str.as_str() {
669            "InProgress" => BackupStatus::InProgress,
670            "Completed" => BackupStatus::Completed,
671            "Failed" => BackupStatus::Failed,
672            "Expired" => BackupStatus::Expired,
673            "Archived" => BackupStatus::Archived,
674            _ => BackupStatus::Failed,
675        };
676
677        Ok(BackupMetadata {
678            id: row.try_get("id")?,
679            backup_type,
680            status,
681            start_time: row.try_get("start_time")?,
682            end_time: row.try_get("end_time")?,
683            size_bytes: row.try_get::<i64, _>("size_bytes")? as u64,
684            compressed_size_bytes: row.try_get::<i64, _>("compressed_size_bytes")? as u64,
685            file_path: std::path::PathBuf::from(row.try_get::<String, _>("file_path")?),
686            checksum: row.try_get("checksum")?,
687            database_name: row.try_get("database_name")?,
688            wal_start_lsn: row.try_get("wal_start_lsn")?,
689            wal_end_lsn: row.try_get("wal_end_lsn")?,
690            encryption_enabled: row.try_get("encryption_enabled")?,
691            replication_status: std::collections::HashMap::new(),
692            verification_status: None,
693        })
694    }
695}
696
697#[derive(Debug, Clone, Serialize, Deserialize)]
698pub struct BackupStatistics {
699    pub total_backups: u32,
700    pub successful_backups_last_7_days: u32,
701    pub failed_backups_last_7_days: u32,
702    pub total_backup_size_bytes: u64,
703    pub latest_backup_time: Option<DateTime<Utc>>,
704    pub backup_frequency_met: bool,
705    pub rto_target_minutes: u32,
706    pub rpo_target_minutes: u32,
707}
708
709#[cfg(test)]
710mod tests {
711    use super::*;
712
713    #[test]
714    fn test_backup_config_default() {
715        let config = BackupConfig::default();
716        assert_eq!(config.retention_days, 30);
717        assert_eq!(config.rto_minutes, 60);
718        assert_eq!(config.rpo_minutes, 5);
719        assert!(config.enable_encryption);
720    }
721
722    #[test]
723    fn test_backup_metadata_creation() {
724        let metadata = BackupMetadata {
725            id: "test-backup".to_string(),
726            backup_type: BackupType::Full,
727            status: BackupStatus::InProgress,
728            start_time: Utc::now(),
729            end_time: None,
730            size_bytes: 0,
731            compressed_size_bytes: 0,
732            file_path: std::path::PathBuf::from("/tmp/test.sql"),
733            checksum: String::new(),
734            database_name: "test_db".to_string(),
735            wal_start_lsn: None,
736            wal_end_lsn: None,
737            encryption_enabled: true,
738            replication_status: std::collections::HashMap::new(),
739            verification_status: None,
740        };
741
742        assert_eq!(metadata.id, "test-backup");
743        assert!(matches!(metadata.backup_type, BackupType::Full));
744        assert!(metadata.encryption_enabled);
745    }
746}