1use super::{BackupConfig, BackupError, BackupMetadata, Result};
2use chrono::Utc;
3use serde::{Deserialize, Serialize};
4use sqlx::PgPool;
5use std::path::Path;
6use std::process::Command;
7use std::sync::Arc;
8use tokio::fs;
9use tracing::{debug, error, info, warn};
10
11pub struct BackupVerifier {
13 config: BackupConfig,
14 db_pool: Arc<PgPool>,
15 verification_stats: Arc<tokio::sync::RwLock<VerificationStats>>,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct VerificationStats {
20 pub total_verifications: u64,
21 pub successful_verifications: u64,
22 pub failed_verifications: u64,
23 pub last_verification_time: Option<chrono::DateTime<chrono::Utc>>,
24 pub average_verification_duration_seconds: f64,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct VerificationResult {
29 pub backup_id: String,
30 pub verification_time: chrono::DateTime<chrono::Utc>,
31 pub integrity_check_passed: bool,
32 pub restoration_test_passed: bool,
33 pub checksum_verified: bool,
34 pub file_structure_valid: bool,
35 pub database_consistency_verified: bool,
36 pub duration_seconds: u32,
37 pub issues_found: Vec<String>,
38 pub error_message: Option<String>,
39}
40
41impl BackupVerifier {
42 pub fn new(config: BackupConfig, db_pool: Arc<PgPool>) -> Self {
43 let verification_stats = Arc::new(tokio::sync::RwLock::new(VerificationStats {
44 total_verifications: 0,
45 successful_verifications: 0,
46 failed_verifications: 0,
47 last_verification_time: None,
48 average_verification_duration_seconds: 0.0,
49 }));
50
51 Self {
52 config,
53 db_pool,
54 verification_stats,
55 }
56 }
57
58 pub async fn initialize(&self) -> Result<()> {
60 info!("Initializing backup verification system");
61
62 self.verify_tools().await?;
64
65 let verification_workspace = self.config.backup_directory.join("verification");
67 fs::create_dir_all(&verification_workspace).await?;
68
69 info!("Backup verification system initialized");
70 Ok(())
71 }
72
73 pub async fn verify_backup(&self, backup: &BackupMetadata) -> Result<VerificationResult> {
75 info!("Starting verification for backup: {}", backup.id);
76
77 let start_time = Utc::now();
78 let mut result = VerificationResult {
79 backup_id: backup.id.clone(),
80 verification_time: start_time,
81 integrity_check_passed: false,
82 restoration_test_passed: false,
83 checksum_verified: false,
84 file_structure_valid: false,
85 database_consistency_verified: false,
86 duration_seconds: 0,
87 issues_found: Vec::new(),
88 error_message: None,
89 };
90
91 if !self.verify_backup_file_exists(backup, &mut result).await? {
93 return self
94 .finalize_verification_result(result, start_time, false)
95 .await;
96 }
97
98 if !self.verify_backup_checksum(backup, &mut result).await? {
100 return self
101 .finalize_verification_result(result, start_time, false)
102 .await;
103 }
104
105 if !self.verify_backup_structure(backup, &mut result).await? {
107 return self
108 .finalize_verification_result(result, start_time, false)
109 .await;
110 }
111
112 if !self.perform_restoration_test(backup, &mut result).await? {
114 return self
115 .finalize_verification_result(result, start_time, false)
116 .await;
117 }
118
119 if !self
121 .verify_database_consistency(backup, &mut result)
122 .await?
123 {
124 return self
125 .finalize_verification_result(result, start_time, false)
126 .await;
127 }
128
129 info!("Backup verification completed successfully: {}", backup.id);
131 self.finalize_verification_result(result, start_time, true)
132 .await
133 }
134
135 pub async fn verify_all_backups(&self) -> Result<Vec<VerificationResult>> {
137 info!("Starting verification of all backups");
138
139 let backups = self.get_all_backups().await?;
140 let mut results = Vec::new();
141
142 for backup in backups {
143 match self.verify_backup(&backup).await {
144 Ok(result) => results.push(result),
145 Err(e) => {
146 error!("Failed to verify backup {}: {}", backup.id, e);
147 }
149 }
150 }
151
152 info!("Completed verification of {} backups", results.len());
153 Ok(results)
154 }
155
156 pub async fn run_scheduled_verification(&self) -> Result<u32> {
158 info!("Running scheduled backup verification");
159
160 let backups_to_verify = self.get_backups_needing_verification().await?;
161 let mut verified_count = 0;
162
163 for backup in backups_to_verify {
164 match self.verify_backup(&backup).await {
165 Ok(result) => {
166 self.store_verification_result(&result).await?;
168
169 if result.integrity_check_passed && result.restoration_test_passed {
170 verified_count += 1;
171 }
172 }
173 Err(e) => {
174 error!(
175 "Scheduled verification failed for backup {}: {}",
176 backup.id, e
177 );
178 }
179 }
180 }
181
182 info!(
183 "Scheduled verification completed: {} backups verified",
184 verified_count
185 );
186 Ok(verified_count)
187 }
188
189 pub async fn get_verification_stats(&self) -> VerificationStats {
191 self.verification_stats.read().await.clone()
192 }
193
194 pub async fn get_verification_history(
196 &self,
197 backup_id: &str,
198 ) -> Result<Vec<VerificationResult>> {
199 debug!("Getting verification history for backup: {}", backup_id);
200
201 Ok(Vec::new())
204 }
205
206 async fn verify_tools(&self) -> Result<()> {
209 debug!("Verifying backup verification tools");
210
211 match Command::new("pg_verifybackup").arg("--version").output() {
213 Ok(output) if output.status.success() => {
214 debug!("pg_verifybackup is available");
215 }
216 _ => {
217 warn!("pg_verifybackup not available, using alternative verification methods");
218 }
219 }
220
221 let output = Command::new("pg_dump")
223 .arg("--version")
224 .output()
225 .map_err(|e| BackupError::ConfigurationError {
226 message: format!("pg_dump not found: {e}"),
227 })?;
228
229 if !output.status.success() {
230 return Err(BackupError::ConfigurationError {
231 message: "pg_dump is not working properly".to_string(),
232 });
233 }
234
235 debug!("Backup verification tools verified");
236 Ok(())
237 }
238
239 async fn verify_backup_file_exists(
240 &self,
241 backup: &BackupMetadata,
242 result: &mut VerificationResult,
243 ) -> Result<bool> {
244 debug!(
245 "Verifying backup file exists: {}",
246 backup.file_path.display()
247 );
248
249 if !backup.file_path.exists() {
250 result
251 .issues_found
252 .push("Backup file does not exist".to_string());
253 result.error_message = Some(format!(
254 "Backup file not found: {}",
255 backup.file_path.display()
256 ));
257 return Ok(false);
258 }
259
260 match fs::metadata(&backup.file_path).await {
262 Ok(metadata) => {
263 if metadata.len() == 0 {
264 result.issues_found.push("Backup file is empty".to_string());
265 return Ok(false);
266 }
267 debug!("Backup file exists and is {} bytes", metadata.len());
268 }
269 Err(e) => {
270 result
271 .issues_found
272 .push(format!("Cannot read backup file metadata: {e}"));
273 return Ok(false);
274 }
275 }
276
277 Ok(true)
278 }
279
280 async fn verify_backup_checksum(
281 &self,
282 backup: &BackupMetadata,
283 result: &mut VerificationResult,
284 ) -> Result<bool> {
285 debug!("Verifying backup checksum");
286
287 if backup.checksum.is_empty() {
288 result
289 .issues_found
290 .push("No checksum available for verification".to_string());
291 return Ok(true); }
293
294 let calculated_checksum = self.calculate_file_checksum(&backup.file_path).await?;
295
296 if calculated_checksum != backup.checksum {
297 result
298 .issues_found
299 .push("Checksum mismatch detected".to_string());
300 result.error_message = Some(format!(
301 "Expected checksum {}, got {}",
302 backup.checksum, calculated_checksum
303 ));
304 return Ok(false);
305 }
306
307 result.checksum_verified = true;
308 debug!("Backup checksum verified successfully");
309 Ok(true)
310 }
311
312 async fn verify_backup_structure(
313 &self,
314 backup: &BackupMetadata,
315 result: &mut VerificationResult,
316 ) -> Result<bool> {
317 debug!("Verifying backup file structure");
318
319 let mut cmd = Command::new("pg_restore");
321 cmd.arg("--list").arg(&backup.file_path);
322
323 let output = match cmd.output() {
324 Ok(output) => output,
325 Err(e) => {
326 result
327 .issues_found
328 .push(format!("Failed to execute pg_restore --list: {e}"));
329 return Ok(false);
330 }
331 };
332
333 if !output.status.success() {
334 let error_msg = String::from_utf8_lossy(&output.stderr);
335 result
336 .issues_found
337 .push(format!("pg_restore --list failed: {error_msg}"));
338 return Ok(false);
339 }
340
341 let list_output = String::from_utf8_lossy(&output.stdout);
343 if list_output.is_empty() {
344 result
345 .issues_found
346 .push("Backup appears to be empty or corrupted".to_string());
347 return Ok(false);
348 }
349
350 let expected_objects = ["TABLE", "SEQUENCE", "INDEX"];
352 for obj_type in &expected_objects {
353 if !list_output.contains(obj_type) {
354 result
355 .issues_found
356 .push(format!("Missing expected object type: {obj_type}"));
357 }
358 }
359
360 result.file_structure_valid = true;
361 debug!("Backup file structure verified successfully");
362 Ok(true)
363 }
364
365 async fn perform_restoration_test(
366 &self,
367 backup: &BackupMetadata,
368 result: &mut VerificationResult,
369 ) -> Result<bool> {
370 debug!("Performing restoration test");
371
372 let test_db_name = format!("backup_test_{}", uuid::Uuid::new_v4().simple());
374
375 if !self.create_test_database(&test_db_name).await? {
377 result
378 .issues_found
379 .push("Failed to create test database".to_string());
380 return Ok(false);
381 }
382
383 let restoration_success = self
385 .restore_to_test_database(backup, &test_db_name, result)
386 .await?;
387
388 if let Err(e) = self.drop_test_database(&test_db_name).await {
390 warn!("Failed to clean up test database {}: {}", test_db_name, e);
391 }
392
393 if restoration_success {
394 result.restoration_test_passed = true;
395 debug!("Restoration test passed successfully");
396 }
397
398 Ok(restoration_success)
399 }
400
401 async fn verify_database_consistency(
402 &self,
403 _backup: &BackupMetadata,
404 result: &mut VerificationResult,
405 ) -> Result<bool> {
406 debug!("Verifying database consistency");
407
408 if result.restoration_test_passed {
411 result.database_consistency_verified = true;
412 debug!("Database consistency verified");
413 Ok(true)
414 } else {
415 result
416 .issues_found
417 .push("Cannot verify consistency - restoration test failed".to_string());
418 Ok(false)
419 }
420 }
421
422 async fn create_test_database(&self, db_name: &str) -> Result<bool> {
423 debug!("Creating test database: {}", db_name);
424
425 let query = format!("CREATE DATABASE {db_name}");
426 match sqlx::query(&query).execute(self.db_pool.as_ref()).await {
427 Ok(_) => {
428 debug!("Test database created successfully");
429 Ok(true)
430 }
431 Err(e) => {
432 error!("Failed to create test database: {}", e);
433 Ok(false)
434 }
435 }
436 }
437
438 async fn restore_to_test_database(
439 &self,
440 backup: &BackupMetadata,
441 test_db_name: &str,
442 result: &mut VerificationResult,
443 ) -> Result<bool> {
444 debug!("Restoring backup to test database: {}", test_db_name);
445
446 let mut cmd = Command::new("pg_restore");
447 cmd.arg("--verbose")
448 .arg("--no-privileges")
449 .arg("--no-owner")
450 .arg("--dbname")
451 .arg(test_db_name)
452 .arg(&backup.file_path);
453
454 let output = match cmd.output() {
455 Ok(output) => output,
456 Err(e) => {
457 result
458 .issues_found
459 .push(format!("Failed to execute pg_restore: {e}"));
460 return Ok(false);
461 }
462 };
463
464 if !output.status.success() {
465 let error_msg = String::from_utf8_lossy(&output.stderr);
466 result
467 .issues_found
468 .push(format!("pg_restore to test database failed: {error_msg}"));
469 return Ok(false);
470 }
471
472 debug!("Backup restored to test database successfully");
473 Ok(true)
474 }
475
476 async fn drop_test_database(&self, db_name: &str) -> Result<()> {
477 debug!("Dropping test database: {}", db_name);
478
479 let query = format!("DROP DATABASE IF EXISTS {db_name}");
480 sqlx::query(&query).execute(self.db_pool.as_ref()).await?;
481
482 debug!("Test database dropped successfully");
483 Ok(())
484 }
485
486 async fn calculate_file_checksum(&self, file_path: &Path) -> Result<String> {
487 use sha2::{Digest, Sha256};
488
489 let contents = fs::read(file_path).await?;
490 let mut hasher = Sha256::new();
491 hasher.update(&contents);
492 let result = hasher.finalize();
493 Ok(format!("{result:x}"))
494 }
495
496 async fn finalize_verification_result(
497 &self,
498 mut result: VerificationResult,
499 start_time: chrono::DateTime<chrono::Utc>,
500 success: bool,
501 ) -> Result<VerificationResult> {
502 let duration = Utc::now().signed_duration_since(start_time);
503 result.duration_seconds = duration.num_seconds() as u32;
504
505 result.integrity_check_passed = result.checksum_verified && result.file_structure_valid;
507
508 {
510 let mut stats = self.verification_stats.write().await;
511 stats.total_verifications += 1;
512 if success {
513 stats.successful_verifications += 1;
514 } else {
515 stats.failed_verifications += 1;
516 }
517 stats.last_verification_time = Some(Utc::now());
518
519 let total_duration = stats.average_verification_duration_seconds
521 * (stats.total_verifications - 1) as f64;
522 stats.average_verification_duration_seconds = (total_duration
523 + result.duration_seconds as f64)
524 / stats.total_verifications as f64;
525 }
526
527 if success {
528 info!(
529 "Backup verification successful: {} ({}s)",
530 result.backup_id, result.duration_seconds
531 );
532 } else {
533 error!(
534 "Backup verification failed: {} ({}s): {:?}",
535 result.backup_id, result.duration_seconds, result.issues_found
536 );
537 }
538
539 Ok(result)
540 }
541
542 async fn get_all_backups(&self) -> Result<Vec<BackupMetadata>> {
543 Ok(Vec::new())
546 }
547
548 async fn get_backups_needing_verification(&self) -> Result<Vec<BackupMetadata>> {
549 Ok(Vec::new())
552 }
553
554 async fn store_verification_result(&self, result: &VerificationResult) -> Result<()> {
555 debug!(
556 "Storing verification result for backup: {}",
557 result.backup_id
558 );
559
560 info!(
563 "Verification result: {} - Success: {}, Duration: {}s",
564 result.backup_id,
565 result.integrity_check_passed && result.restoration_test_passed,
566 result.duration_seconds
567 );
568
569 Ok(())
570 }
571}
572
573impl Default for VerificationStats {
574 fn default() -> Self {
575 Self {
576 total_verifications: 0,
577 successful_verifications: 0,
578 failed_verifications: 0,
579 last_verification_time: None,
580 average_verification_duration_seconds: 0.0,
581 }
582 }
583}
584
585#[cfg(test)]
586mod tests {
587 use super::*;
588
589 #[test]
590 fn test_verification_stats_default() {
591 let stats = VerificationStats::default();
592 assert_eq!(stats.total_verifications, 0);
593 assert_eq!(stats.successful_verifications, 0);
594 assert_eq!(stats.failed_verifications, 0);
595 assert_eq!(stats.average_verification_duration_seconds, 0.0);
596 }
597
598 #[test]
599 fn test_verification_result_creation() {
600 let result = VerificationResult {
601 backup_id: "test-backup".to_string(),
602 verification_time: Utc::now(),
603 integrity_check_passed: false,
604 restoration_test_passed: false,
605 checksum_verified: false,
606 file_structure_valid: false,
607 database_consistency_verified: false,
608 duration_seconds: 0,
609 issues_found: Vec::new(),
610 error_message: None,
611 };
612
613 assert_eq!(result.backup_id, "test-backup");
614 assert!(!result.integrity_check_passed);
615 assert!(result.issues_found.is_empty());
616 }
617}