codex_memory/backup/
backup_verification.rs

1use super::{BackupConfig, BackupError, BackupMetadata, Result};
2use chrono::Utc;
3use serde::{Deserialize, Serialize};
4use sqlx::PgPool;
5use std::path::Path;
6use std::process::Command;
7use std::sync::Arc;
8use tokio::fs;
9use tracing::{debug, error, info, warn};
10
11/// Backup verification system for ensuring backup integrity and restorability
12pub struct BackupVerifier {
13    config: BackupConfig,
14    db_pool: Arc<PgPool>,
15    verification_stats: Arc<tokio::sync::RwLock<VerificationStats>>,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct VerificationStats {
20    pub total_verifications: u64,
21    pub successful_verifications: u64,
22    pub failed_verifications: u64,
23    pub last_verification_time: Option<chrono::DateTime<chrono::Utc>>,
24    pub average_verification_duration_seconds: f64,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct VerificationResult {
29    pub backup_id: String,
30    pub verification_time: chrono::DateTime<chrono::Utc>,
31    pub integrity_check_passed: bool,
32    pub restoration_test_passed: bool,
33    pub checksum_verified: bool,
34    pub file_structure_valid: bool,
35    pub database_consistency_verified: bool,
36    pub duration_seconds: u32,
37    pub issues_found: Vec<String>,
38    pub error_message: Option<String>,
39}
40
41impl BackupVerifier {
42    pub fn new(config: BackupConfig, db_pool: Arc<PgPool>) -> Self {
43        let verification_stats = Arc::new(tokio::sync::RwLock::new(VerificationStats {
44            total_verifications: 0,
45            successful_verifications: 0,
46            failed_verifications: 0,
47            last_verification_time: None,
48            average_verification_duration_seconds: 0.0,
49        }));
50
51        Self {
52            config,
53            db_pool,
54            verification_stats,
55        }
56    }
57
58    /// Initialize the backup verification system
59    pub async fn initialize(&self) -> Result<()> {
60        info!("Initializing backup verification system");
61
62        // Verify verification tools are available
63        self.verify_tools().await?;
64
65        // Create verification workspace
66        let verification_workspace = self.config.backup_directory.join("verification");
67        fs::create_dir_all(&verification_workspace).await?;
68
69        info!("Backup verification system initialized");
70        Ok(())
71    }
72
73    /// Verify a specific backup's integrity and restorability
74    pub async fn verify_backup(&self, backup: &BackupMetadata) -> Result<VerificationResult> {
75        info!("Starting verification for backup: {}", backup.id);
76
77        let start_time = Utc::now();
78        let mut result = VerificationResult {
79            backup_id: backup.id.clone(),
80            verification_time: start_time,
81            integrity_check_passed: false,
82            restoration_test_passed: false,
83            checksum_verified: false,
84            file_structure_valid: false,
85            database_consistency_verified: false,
86            duration_seconds: 0,
87            issues_found: Vec::new(),
88            error_message: None,
89        };
90
91        // Step 1: Verify file existence
92        if !self.verify_backup_file_exists(backup, &mut result).await? {
93            return self
94                .finalize_verification_result(result, start_time, false)
95                .await;
96        }
97
98        // Step 2: Verify file checksum
99        if !self.verify_backup_checksum(backup, &mut result).await? {
100            return self
101                .finalize_verification_result(result, start_time, false)
102                .await;
103        }
104
105        // Step 3: Verify file structure
106        if !self.verify_backup_structure(backup, &mut result).await? {
107            return self
108                .finalize_verification_result(result, start_time, false)
109                .await;
110        }
111
112        // Step 4: Perform restoration test
113        if !self.perform_restoration_test(backup, &mut result).await? {
114            return self
115                .finalize_verification_result(result, start_time, false)
116                .await;
117        }
118
119        // Step 5: Verify database consistency
120        if !self
121            .verify_database_consistency(backup, &mut result)
122            .await?
123        {
124            return self
125                .finalize_verification_result(result, start_time, false)
126                .await;
127        }
128
129        // All checks passed
130        info!("Backup verification completed successfully: {}", backup.id);
131        self.finalize_verification_result(result, start_time, true)
132            .await
133    }
134
135    /// Verify all backups in the system
136    pub async fn verify_all_backups(&self) -> Result<Vec<VerificationResult>> {
137        info!("Starting verification of all backups");
138
139        let backups = self.get_all_backups().await?;
140        let mut results = Vec::new();
141
142        for backup in backups {
143            match self.verify_backup(&backup).await {
144                Ok(result) => results.push(result),
145                Err(e) => {
146                    error!("Failed to verify backup {}: {}", backup.id, e);
147                    // Continue with other backups
148                }
149            }
150        }
151
152        info!("Completed verification of {} backups", results.len());
153        Ok(results)
154    }
155
156    /// Run automated verification based on schedule
157    pub async fn run_scheduled_verification(&self) -> Result<u32> {
158        info!("Running scheduled backup verification");
159
160        let backups_to_verify = self.get_backups_needing_verification().await?;
161        let mut verified_count = 0;
162
163        for backup in backups_to_verify {
164            match self.verify_backup(&backup).await {
165                Ok(result) => {
166                    // Store verification result
167                    self.store_verification_result(&result).await?;
168
169                    if result.integrity_check_passed && result.restoration_test_passed {
170                        verified_count += 1;
171                    }
172                }
173                Err(e) => {
174                    error!(
175                        "Scheduled verification failed for backup {}: {}",
176                        backup.id, e
177                    );
178                }
179            }
180        }
181
182        info!(
183            "Scheduled verification completed: {} backups verified",
184            verified_count
185        );
186        Ok(verified_count)
187    }
188
189    /// Get verification statistics
190    pub async fn get_verification_stats(&self) -> VerificationStats {
191        self.verification_stats.read().await.clone()
192    }
193
194    /// Get verification history for a specific backup
195    pub async fn get_verification_history(
196        &self,
197        backup_id: &str,
198    ) -> Result<Vec<VerificationResult>> {
199        debug!("Getting verification history for backup: {}", backup_id);
200
201        // This would query the verification results from the database
202        // For now, return empty vector
203        Ok(Vec::new())
204    }
205
206    // Private helper methods
207
208    async fn verify_tools(&self) -> Result<()> {
209        debug!("Verifying backup verification tools");
210
211        // Check if pg_verifybackup is available (PostgreSQL 13+)
212        match Command::new("pg_verifybackup").arg("--version").output() {
213            Ok(output) if output.status.success() => {
214                debug!("pg_verifybackup is available");
215            }
216            _ => {
217                warn!("pg_verifybackup not available, using alternative verification methods");
218            }
219        }
220
221        // Check if pg_dump is available
222        let output = Command::new("pg_dump")
223            .arg("--version")
224            .output()
225            .map_err(|e| BackupError::ConfigurationError {
226                message: format!("pg_dump not found: {e}"),
227            })?;
228
229        if !output.status.success() {
230            return Err(BackupError::ConfigurationError {
231                message: "pg_dump is not working properly".to_string(),
232            });
233        }
234
235        debug!("Backup verification tools verified");
236        Ok(())
237    }
238
239    async fn verify_backup_file_exists(
240        &self,
241        backup: &BackupMetadata,
242        result: &mut VerificationResult,
243    ) -> Result<bool> {
244        debug!(
245            "Verifying backup file exists: {}",
246            backup.file_path.display()
247        );
248
249        if !backup.file_path.exists() {
250            result
251                .issues_found
252                .push("Backup file does not exist".to_string());
253            result.error_message = Some(format!(
254                "Backup file not found: {}",
255                backup.file_path.display()
256            ));
257            return Ok(false);
258        }
259
260        // Check file is readable
261        match fs::metadata(&backup.file_path).await {
262            Ok(metadata) => {
263                if metadata.len() == 0 {
264                    result.issues_found.push("Backup file is empty".to_string());
265                    return Ok(false);
266                }
267                debug!("Backup file exists and is {} bytes", metadata.len());
268            }
269            Err(e) => {
270                result
271                    .issues_found
272                    .push(format!("Cannot read backup file metadata: {e}"));
273                return Ok(false);
274            }
275        }
276
277        Ok(true)
278    }
279
280    async fn verify_backup_checksum(
281        &self,
282        backup: &BackupMetadata,
283        result: &mut VerificationResult,
284    ) -> Result<bool> {
285        debug!("Verifying backup checksum");
286
287        if backup.checksum.is_empty() {
288            result
289                .issues_found
290                .push("No checksum available for verification".to_string());
291            return Ok(true); // Not a failure, but we can't verify
292        }
293
294        let calculated_checksum = self.calculate_file_checksum(&backup.file_path).await?;
295
296        if calculated_checksum != backup.checksum {
297            result
298                .issues_found
299                .push("Checksum mismatch detected".to_string());
300            result.error_message = Some(format!(
301                "Expected checksum {}, got {}",
302                backup.checksum, calculated_checksum
303            ));
304            return Ok(false);
305        }
306
307        result.checksum_verified = true;
308        debug!("Backup checksum verified successfully");
309        Ok(true)
310    }
311
312    async fn verify_backup_structure(
313        &self,
314        backup: &BackupMetadata,
315        result: &mut VerificationResult,
316    ) -> Result<bool> {
317        debug!("Verifying backup file structure");
318
319        // For PostgreSQL custom format backups, we can use pg_restore --list to verify structure
320        let mut cmd = Command::new("pg_restore");
321        cmd.arg("--list").arg(&backup.file_path);
322
323        let output = match cmd.output() {
324            Ok(output) => output,
325            Err(e) => {
326                result
327                    .issues_found
328                    .push(format!("Failed to execute pg_restore --list: {e}"));
329                return Ok(false);
330            }
331        };
332
333        if !output.status.success() {
334            let error_msg = String::from_utf8_lossy(&output.stderr);
335            result
336                .issues_found
337                .push(format!("pg_restore --list failed: {error_msg}"));
338            return Ok(false);
339        }
340
341        // Verify the output contains expected database objects
342        let list_output = String::from_utf8_lossy(&output.stdout);
343        if list_output.is_empty() {
344            result
345                .issues_found
346                .push("Backup appears to be empty or corrupted".to_string());
347            return Ok(false);
348        }
349
350        // Check for essential database objects (tables, sequences, etc.)
351        let expected_objects = ["TABLE", "SEQUENCE", "INDEX"];
352        for obj_type in &expected_objects {
353            if !list_output.contains(obj_type) {
354                result
355                    .issues_found
356                    .push(format!("Missing expected object type: {obj_type}"));
357            }
358        }
359
360        result.file_structure_valid = true;
361        debug!("Backup file structure verified successfully");
362        Ok(true)
363    }
364
365    async fn perform_restoration_test(
366        &self,
367        backup: &BackupMetadata,
368        result: &mut VerificationResult,
369    ) -> Result<bool> {
370        debug!("Performing restoration test");
371
372        // Create temporary database for restoration test
373        let test_db_name = format!("backup_test_{}", uuid::Uuid::new_v4().simple());
374
375        // Create test database
376        if !self.create_test_database(&test_db_name).await? {
377            result
378                .issues_found
379                .push("Failed to create test database".to_string());
380            return Ok(false);
381        }
382
383        // Restore backup to test database
384        let restoration_success = self
385            .restore_to_test_database(backup, &test_db_name, result)
386            .await?;
387
388        // Clean up test database
389        if let Err(e) = self.drop_test_database(&test_db_name).await {
390            warn!("Failed to clean up test database {}: {}", test_db_name, e);
391        }
392
393        if restoration_success {
394            result.restoration_test_passed = true;
395            debug!("Restoration test passed successfully");
396        }
397
398        Ok(restoration_success)
399    }
400
401    async fn verify_database_consistency(
402        &self,
403        _backup: &BackupMetadata,
404        result: &mut VerificationResult,
405    ) -> Result<bool> {
406        debug!("Verifying database consistency");
407
408        // For now, we'll assume consistency if the restoration test passed
409        // In a full implementation, this would run additional consistency checks
410        if result.restoration_test_passed {
411            result.database_consistency_verified = true;
412            debug!("Database consistency verified");
413            Ok(true)
414        } else {
415            result
416                .issues_found
417                .push("Cannot verify consistency - restoration test failed".to_string());
418            Ok(false)
419        }
420    }
421
422    async fn create_test_database(&self, db_name: &str) -> Result<bool> {
423        debug!("Creating test database: {}", db_name);
424
425        let query = format!("CREATE DATABASE {db_name}");
426        match sqlx::query(&query).execute(self.db_pool.as_ref()).await {
427            Ok(_) => {
428                debug!("Test database created successfully");
429                Ok(true)
430            }
431            Err(e) => {
432                error!("Failed to create test database: {}", e);
433                Ok(false)
434            }
435        }
436    }
437
438    async fn restore_to_test_database(
439        &self,
440        backup: &BackupMetadata,
441        test_db_name: &str,
442        result: &mut VerificationResult,
443    ) -> Result<bool> {
444        debug!("Restoring backup to test database: {}", test_db_name);
445
446        let mut cmd = Command::new("pg_restore");
447        cmd.arg("--verbose")
448            .arg("--no-privileges")
449            .arg("--no-owner")
450            .arg("--dbname")
451            .arg(test_db_name)
452            .arg(&backup.file_path);
453
454        let output = match cmd.output() {
455            Ok(output) => output,
456            Err(e) => {
457                result
458                    .issues_found
459                    .push(format!("Failed to execute pg_restore: {e}"));
460                return Ok(false);
461            }
462        };
463
464        if !output.status.success() {
465            let error_msg = String::from_utf8_lossy(&output.stderr);
466            result
467                .issues_found
468                .push(format!("pg_restore to test database failed: {error_msg}"));
469            return Ok(false);
470        }
471
472        debug!("Backup restored to test database successfully");
473        Ok(true)
474    }
475
476    async fn drop_test_database(&self, db_name: &str) -> Result<()> {
477        debug!("Dropping test database: {}", db_name);
478
479        let query = format!("DROP DATABASE IF EXISTS {db_name}");
480        sqlx::query(&query).execute(self.db_pool.as_ref()).await?;
481
482        debug!("Test database dropped successfully");
483        Ok(())
484    }
485
486    async fn calculate_file_checksum(&self, file_path: &Path) -> Result<String> {
487        use sha2::{Digest, Sha256};
488
489        let contents = fs::read(file_path).await?;
490        let mut hasher = Sha256::new();
491        hasher.update(&contents);
492        let result = hasher.finalize();
493        Ok(format!("{result:x}"))
494    }
495
496    async fn finalize_verification_result(
497        &self,
498        mut result: VerificationResult,
499        start_time: chrono::DateTime<chrono::Utc>,
500        success: bool,
501    ) -> Result<VerificationResult> {
502        let duration = Utc::now().signed_duration_since(start_time);
503        result.duration_seconds = duration.num_seconds() as u32;
504
505        // Set overall flags based on individual checks
506        result.integrity_check_passed = result.checksum_verified && result.file_structure_valid;
507
508        // Update statistics
509        {
510            let mut stats = self.verification_stats.write().await;
511            stats.total_verifications += 1;
512            if success {
513                stats.successful_verifications += 1;
514            } else {
515                stats.failed_verifications += 1;
516            }
517            stats.last_verification_time = Some(Utc::now());
518
519            // Update average duration
520            let total_duration = stats.average_verification_duration_seconds
521                * (stats.total_verifications - 1) as f64;
522            stats.average_verification_duration_seconds = (total_duration
523                + result.duration_seconds as f64)
524                / stats.total_verifications as f64;
525        }
526
527        if success {
528            info!(
529                "Backup verification successful: {} ({}s)",
530                result.backup_id, result.duration_seconds
531            );
532        } else {
533            error!(
534                "Backup verification failed: {} ({}s): {:?}",
535                result.backup_id, result.duration_seconds, result.issues_found
536            );
537        }
538
539        Ok(result)
540    }
541
542    async fn get_all_backups(&self) -> Result<Vec<BackupMetadata>> {
543        // This would query the backup metadata store
544        // For now, return empty vector
545        Ok(Vec::new())
546    }
547
548    async fn get_backups_needing_verification(&self) -> Result<Vec<BackupMetadata>> {
549        // This would query for backups that haven't been verified recently
550        // or have never been verified
551        Ok(Vec::new())
552    }
553
554    async fn store_verification_result(&self, result: &VerificationResult) -> Result<()> {
555        debug!(
556            "Storing verification result for backup: {}",
557            result.backup_id
558        );
559
560        // This would store the verification result in the database
561        // For now, just log it
562        info!(
563            "Verification result: {} - Success: {}, Duration: {}s",
564            result.backup_id,
565            result.integrity_check_passed && result.restoration_test_passed,
566            result.duration_seconds
567        );
568
569        Ok(())
570    }
571}
572
573impl Default for VerificationStats {
574    fn default() -> Self {
575        Self {
576            total_verifications: 0,
577            successful_verifications: 0,
578            failed_verifications: 0,
579            last_verification_time: None,
580            average_verification_duration_seconds: 0.0,
581        }
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use super::*;
588
589    #[test]
590    fn test_verification_stats_default() {
591        let stats = VerificationStats::default();
592        assert_eq!(stats.total_verifications, 0);
593        assert_eq!(stats.successful_verifications, 0);
594        assert_eq!(stats.failed_verifications, 0);
595        assert_eq!(stats.average_verification_duration_seconds, 0.0);
596    }
597
598    #[test]
599    fn test_verification_result_creation() {
600        let result = VerificationResult {
601            backup_id: "test-backup".to_string(),
602            verification_time: Utc::now(),
603            integrity_check_passed: false,
604            restoration_test_passed: false,
605            checksum_verified: false,
606            file_structure_valid: false,
607            database_consistency_verified: false,
608            duration_seconds: 0,
609            issues_found: Vec::new(),
610            error_message: None,
611        };
612
613        assert_eq!(result.backup_id, "test-backup");
614        assert!(!result.integrity_check_passed);
615        assert!(result.issues_found.is_empty());
616    }
617}