1use super::{BackupConfig, BackupError, BackupMetadata, BackupStatus, BackupType, Result};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use sqlx::PgPool;
5use std::path::Path;
6use std::process::Command;
7use std::sync::Arc;
8use tokio::fs;
9use tracing::{debug, error, info, warn};
10use uuid::Uuid;
11
12pub struct BackupManager {
14 config: BackupConfig,
15 db_pool: Arc<PgPool>,
16 metadata_store: BackupMetadataStore,
17}
18
19impl BackupManager {
20 pub fn new(config: BackupConfig, db_pool: Arc<PgPool>) -> Self {
21 let metadata_store = BackupMetadataStore::new(db_pool.clone());
22 Self {
23 config,
24 db_pool,
25 metadata_store,
26 }
27 }
28
29 pub async fn initialize(&self) -> Result<()> {
31 info!("Initializing backup manager");
32
33 fs::create_dir_all(&self.config.backup_directory).await?;
35 fs::create_dir_all(&self.config.wal_archive_directory).await?;
36
37 self.metadata_store.initialize().await?;
39
40 self.verify_postgres_config().await?;
42
43 info!("Backup manager initialized successfully");
44 Ok(())
45 }
46
47 pub async fn create_full_backup(&self) -> Result<BackupMetadata> {
49 let backup_id = Uuid::new_v4().to_string();
50 let start_time = Utc::now();
51
52 info!("Starting full backup: {}", backup_id);
53
54 let backup_filename = format!(
55 "full_backup_{}_{}.sql.gz",
56 start_time.format("%Y%m%d_%H%M%S"),
57 backup_id
58 );
59 let backup_path = self.config.backup_directory.join(&backup_filename);
60
61 self.check_disk_space(&backup_path).await?;
63
64 let mut metadata = BackupMetadata {
65 id: backup_id.clone(),
66 backup_type: BackupType::Full,
67 status: BackupStatus::InProgress,
68 start_time,
69 end_time: None,
70 size_bytes: 0,
71 compressed_size_bytes: 0,
72 file_path: backup_path.clone(),
73 checksum: String::new(),
74 database_name: self.extract_database_name()?,
75 wal_start_lsn: None,
76 wal_end_lsn: None,
77 encryption_enabled: self.config.enable_encryption,
78 replication_status: std::collections::HashMap::new(),
79 verification_status: None,
80 };
81
82 self.metadata_store.store_metadata(&metadata).await?;
84
85 let start_lsn = self.get_current_wal_lsn().await?;
87 metadata.wal_start_lsn = Some(start_lsn);
88
89 match self
91 .execute_pg_dump(&backup_path, &metadata.database_name)
92 .await
93 {
94 Ok(_) => {
95 let end_time = Utc::now();
96 metadata.end_time = Some(end_time);
97 metadata.status = BackupStatus::Completed;
98
99 let end_lsn = self.get_current_wal_lsn().await?;
101 metadata.wal_end_lsn = Some(end_lsn);
102
103 let file_metadata = fs::metadata(&backup_path).await?;
105 metadata.compressed_size_bytes = file_metadata.len();
106 metadata.checksum = self.calculate_file_checksum(&backup_path).await?;
107
108 self.metadata_store.update_metadata(&metadata).await?;
110
111 info!(
112 "Full backup completed successfully: {} ({} bytes)",
113 backup_id, metadata.compressed_size_bytes
114 );
115
116 if self.config.enable_replication {
118 self.replicate_backup(&metadata).await?;
119 }
120
121 Ok(metadata)
122 }
123 Err(e) => {
124 metadata.status = BackupStatus::Failed;
125 metadata.end_time = Some(Utc::now());
126 self.metadata_store.update_metadata(&metadata).await?;
127
128 error!("Full backup failed: {}", e);
129 Err(e)
130 }
131 }
132 }
133
134 pub async fn create_incremental_backup(&self) -> Result<BackupMetadata> {
136 let backup_id = Uuid::new_v4().to_string();
137 let start_time = Utc::now();
138
139 info!("Starting incremental backup: {}", backup_id);
140
141 let last_backup = self.metadata_store.get_latest_backup().await?;
143 let start_lsn = match last_backup {
144 Some(backup) => {
145 if let Some(lsn) = backup.wal_end_lsn {
146 lsn
147 } else {
148 warn!("Last backup has no end LSN, using current LSN");
149 self.get_current_wal_lsn().await.unwrap_or_default()
150 }
151 }
152 None => {
153 return Err(BackupError::BackupFailed {
154 message: "No previous backup found for incremental backup".to_string(),
155 });
156 }
157 };
158
159 let backup_filename = format!(
160 "incremental_backup_{}_{}.tar.gz",
161 start_time.format("%Y%m%d_%H%M%S"),
162 backup_id
163 );
164 let backup_path = self.config.backup_directory.join(&backup_filename);
165
166 let mut metadata = BackupMetadata {
167 id: backup_id.clone(),
168 backup_type: BackupType::Incremental,
169 status: BackupStatus::InProgress,
170 start_time,
171 end_time: None,
172 size_bytes: 0,
173 compressed_size_bytes: 0,
174 file_path: backup_path.clone(),
175 checksum: String::new(),
176 database_name: self.extract_database_name()?,
177 wal_start_lsn: Some(start_lsn.clone()),
178 wal_end_lsn: None,
179 encryption_enabled: self.config.enable_encryption,
180 replication_status: std::collections::HashMap::new(),
181 verification_status: None,
182 };
183
184 self.metadata_store.store_metadata(&metadata).await?;
186
187 match self.archive_wal_files(&start_lsn, &backup_path).await {
189 Ok(end_lsn) => {
190 let end_time = Utc::now();
191 metadata.end_time = Some(end_time);
192 metadata.status = BackupStatus::Completed;
193 metadata.wal_end_lsn = Some(end_lsn);
194
195 if backup_path.exists() {
197 let file_metadata = fs::metadata(&backup_path).await?;
198 metadata.compressed_size_bytes = file_metadata.len();
199 metadata.checksum = self.calculate_file_checksum(&backup_path).await?;
200 } else {
201 warn!("No new WAL files to archive since last backup");
202 }
203
204 self.metadata_store.update_metadata(&metadata).await?;
206
207 info!("Incremental backup completed successfully: {}", backup_id);
208 Ok(metadata)
209 }
210 Err(e) => {
211 metadata.status = BackupStatus::Failed;
212 metadata.end_time = Some(Utc::now());
213 self.metadata_store.update_metadata(&metadata).await?;
214
215 error!("Incremental backup failed: {}", e);
216 Err(e)
217 }
218 }
219 }
220
221 pub async fn cleanup_expired_backups(&self) -> Result<u32> {
223 info!(
224 "Starting backup cleanup with retention policy of {} days",
225 self.config.retention_days
226 );
227
228 let expired_backups = self
229 .metadata_store
230 .get_expired_backups(self.config.retention_days)
231 .await?;
232 let mut cleanup_count = 0;
233
234 for backup in expired_backups {
235 match self.delete_backup(&backup).await {
236 Ok(_) => {
237 cleanup_count += 1;
238 info!("Deleted expired backup: {}", backup.id);
239 }
240 Err(e) => {
241 error!("Failed to delete expired backup {}: {}", backup.id, e);
242 }
243 }
244 }
245
246 info!("Cleanup completed: {} backups deleted", cleanup_count);
247 Ok(cleanup_count)
248 }
249
250 pub async fn get_backup_statistics(&self) -> Result<BackupStatistics> {
252 let total_backups = self.metadata_store.count_backups().await?;
253 let recent_backups = self.metadata_store.get_recent_backups(7).await?;
254
255 let successful_backups = recent_backups
256 .iter()
257 .filter(|b| b.status == BackupStatus::Completed)
258 .count();
259
260 let failed_backups = recent_backups
261 .iter()
262 .filter(|b| b.status == BackupStatus::Failed)
263 .count();
264
265 let total_backup_size = recent_backups
266 .iter()
267 .filter(|b| b.status == BackupStatus::Completed)
268 .map(|b| b.compressed_size_bytes)
269 .sum();
270
271 let latest_backup = self.metadata_store.get_latest_backup().await?;
272 let backup_frequency_met = if let Some(latest) = &latest_backup {
273 let hours_since_last = Utc::now()
274 .signed_duration_since(latest.start_time)
275 .num_hours();
276 hours_since_last <= 24 } else {
278 false
279 };
280
281 Ok(BackupStatistics {
282 total_backups,
283 successful_backups_last_7_days: successful_backups as u32,
284 failed_backups_last_7_days: failed_backups as u32,
285 total_backup_size_bytes: total_backup_size,
286 latest_backup_time: latest_backup.map(|b| b.start_time),
287 backup_frequency_met,
288 rto_target_minutes: self.config.rto_minutes,
289 rpo_target_minutes: self.config.rpo_minutes,
290 })
291 }
292
293 async fn verify_postgres_config(&self) -> Result<()> {
296 debug!("Verifying PostgreSQL configuration for backups");
297
298 let wal_level: String = sqlx::query_scalar("SHOW wal_level")
300 .fetch_one(self.db_pool.as_ref())
301 .await?;
302
303 if wal_level != "replica" && wal_level != "logical" {
304 return Err(BackupError::ConfigurationError {
305 message: format!("WAL level must be 'replica' or 'logical', found: {wal_level}"),
306 });
307 }
308
309 let archive_mode: String = sqlx::query_scalar("SHOW archive_mode")
311 .fetch_one(self.db_pool.as_ref())
312 .await?;
313
314 if archive_mode != "on" {
315 warn!("Archive mode is not enabled, continuous archiving will not work");
316 }
317
318 debug!("PostgreSQL configuration verified");
319 Ok(())
320 }
321
322 fn extract_database_name(&self) -> Result<String> {
323 Ok("codex_memory".to_string())
326 }
327
328 async fn get_current_wal_lsn(&self) -> Result<String> {
329 self.retry_database_operation(|| async {
330 let lsn: String = sqlx::query_scalar("SELECT pg_current_wal_lsn()")
331 .fetch_one(self.db_pool.as_ref())
332 .await?;
333 Ok(lsn)
334 })
335 .await
336 }
337
338 async fn retry_database_operation<T, F, Fut>(&self, operation: F) -> Result<T>
340 where
341 F: Fn() -> Fut,
342 Fut: std::future::Future<Output = Result<T>>,
343 {
344 let mut retries = 0;
345 let max_retries = 3;
346
347 loop {
348 match operation().await {
349 Ok(result) => return Ok(result),
350 Err(e) => {
351 if retries >= max_retries {
352 return Err(e);
353 }
354
355 retries += 1;
356 let delay = std::time::Duration::from_millis(100 * (1 << retries));
357 warn!(
358 "Database operation failed (attempt {}), retrying in {:?}: {}",
359 retries, delay, e
360 );
361 tokio::time::sleep(delay).await;
362 }
363 }
364 }
365 }
366
367 async fn check_disk_space(&self, backup_path: &Path) -> Result<()> {
369 let backup_dir = backup_path.parent().unwrap_or_else(|| Path::new("/"));
370
371 let test_file = backup_dir.join(".backup_space_test");
373 match fs::write(&test_file, b"test").await {
374 Ok(_) => {
375 let _ = fs::remove_file(&test_file).await;
376 debug!("Disk space check passed for {}", backup_dir.display());
377 Ok(())
378 }
379 Err(e) => {
380 error!("Insufficient disk space or permissions for backup: {}", e);
381 Err(BackupError::BackupFailed {
382 message: format!("Cannot write to backup directory: {e}"),
383 })
384 }
385 }
386 }
387
388 async fn execute_pg_dump(&self, backup_path: &Path, database_name: &str) -> Result<()> {
389 debug!("Executing pg_dump to {}", backup_path.display());
390
391 let mut cmd = Command::new("pg_dump");
392 cmd.arg("--verbose")
393 .arg("--format=custom")
394 .arg("--compress=9")
395 .arg("--no-privileges")
396 .arg("--no-owner")
397 .arg("--dbname")
398 .arg(database_name)
399 .arg("--file")
400 .arg(backup_path);
401
402 cmd.arg("--host")
405 .arg("localhost")
406 .arg("--port")
407 .arg("5432")
408 .arg("--username")
409 .arg("postgres");
410
411 let output = cmd.output().map_err(|e| BackupError::BackupFailed {
412 message: format!("Failed to execute pg_dump: {e}"),
413 })?;
414
415 if !output.status.success() {
416 let error_msg = String::from_utf8_lossy(&output.stderr);
417 return Err(BackupError::BackupFailed {
418 message: format!("pg_dump failed: {error_msg}"),
419 });
420 }
421
422 debug!("pg_dump completed successfully");
423 Ok(())
424 }
425
426 async fn archive_wal_files(&self, start_lsn: &str, backup_path: &Path) -> Result<String> {
427 debug!("Archiving WAL files from LSN: {}", start_lsn);
428
429 let current_lsn = self.get_current_wal_lsn().await?;
432
433 tokio::fs::write(backup_path, b"").await?;
435
436 Ok(current_lsn)
437 }
438
439 async fn calculate_file_checksum(&self, file_path: &Path) -> Result<String> {
440 use sha2::{Digest, Sha256};
441
442 let contents = fs::read(file_path).await?;
443 let mut hasher = Sha256::new();
444 hasher.update(&contents);
445 let result = hasher.finalize();
446 Ok(format!("{result:x}"))
447 }
448
449 async fn replicate_backup(&self, metadata: &BackupMetadata) -> Result<()> {
450 debug!("Replicating backup: {}", metadata.id);
451
452 for target in &self.config.replication_targets {
453 info!("Replicating to target: {}", target.name);
454 }
457
458 Ok(())
459 }
460
461 async fn delete_backup(&self, backup: &BackupMetadata) -> Result<()> {
462 debug!("Deleting backup: {}", backup.id);
463
464 if backup.file_path.exists() {
466 fs::remove_file(&backup.file_path).await?;
467 }
468
469 self.metadata_store.mark_backup_expired(&backup.id).await?;
471
472 Ok(())
473 }
474}
475
476struct BackupMetadataStore {
478 db_pool: Arc<PgPool>,
479}
480
481impl BackupMetadataStore {
482 fn new(db_pool: Arc<PgPool>) -> Self {
483 Self { db_pool }
484 }
485
486 async fn initialize(&self) -> Result<()> {
487 debug!("Initializing backup metadata store");
488
489 sqlx::query(
491 r#"
492 CREATE TABLE IF NOT EXISTS backup_metadata (
493 id VARCHAR PRIMARY KEY,
494 backup_type VARCHAR NOT NULL,
495 status VARCHAR NOT NULL,
496 start_time TIMESTAMPTZ NOT NULL,
497 end_time TIMESTAMPTZ,
498 size_bytes BIGINT DEFAULT 0,
499 compressed_size_bytes BIGINT DEFAULT 0,
500 file_path TEXT NOT NULL,
501 checksum VARCHAR,
502 database_name VARCHAR NOT NULL,
503 wal_start_lsn VARCHAR,
504 wal_end_lsn VARCHAR,
505 encryption_enabled BOOLEAN DEFAULT false,
506 metadata JSONB DEFAULT '{}',
507 created_at TIMESTAMPTZ DEFAULT NOW()
508 )
509 "#,
510 )
511 .execute(self.db_pool.as_ref())
512 .await?;
513
514 debug!("Backup metadata store initialized");
515 Ok(())
516 }
517
518 async fn store_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
519 sqlx::query(
520 r#"
521 INSERT INTO backup_metadata (
522 id, backup_type, status, start_time, end_time,
523 size_bytes, compressed_size_bytes, file_path, checksum,
524 database_name, wal_start_lsn, wal_end_lsn, encryption_enabled
525 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
526 "#,
527 )
528 .bind(&metadata.id)
529 .bind(format!("{:?}", metadata.backup_type))
530 .bind(format!("{:?}", metadata.status))
531 .bind(metadata.start_time)
532 .bind(metadata.end_time)
533 .bind(metadata.size_bytes as i64)
534 .bind(metadata.compressed_size_bytes as i64)
535 .bind(metadata.file_path.to_string_lossy().as_ref())
536 .bind(&metadata.checksum)
537 .bind(&metadata.database_name)
538 .bind(&metadata.wal_start_lsn)
539 .bind(&metadata.wal_end_lsn)
540 .bind(metadata.encryption_enabled)
541 .execute(self.db_pool.as_ref())
542 .await?;
543
544 Ok(())
545 }
546
547 async fn update_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
548 sqlx::query(
549 r#"
550 UPDATE backup_metadata SET
551 status = $2, end_time = $3, size_bytes = $4,
552 compressed_size_bytes = $5, checksum = $6,
553 wal_end_lsn = $7
554 WHERE id = $1
555 "#,
556 )
557 .bind(&metadata.id)
558 .bind(format!("{:?}", metadata.status))
559 .bind(metadata.end_time)
560 .bind(metadata.size_bytes as i64)
561 .bind(metadata.compressed_size_bytes as i64)
562 .bind(&metadata.checksum)
563 .bind(&metadata.wal_end_lsn)
564 .execute(self.db_pool.as_ref())
565 .await?;
566
567 Ok(())
568 }
569
570 async fn get_latest_backup(&self) -> Result<Option<BackupMetadata>> {
571 let row = sqlx::query(
572 r#"
573 SELECT id, backup_type, status, start_time, end_time,
574 size_bytes, compressed_size_bytes, file_path, checksum,
575 database_name, wal_start_lsn, wal_end_lsn, encryption_enabled
576 FROM backup_metadata
577 WHERE status = 'Completed'
578 ORDER BY start_time DESC
579 LIMIT 1
580 "#,
581 )
582 .fetch_optional(self.db_pool.as_ref())
583 .await?;
584
585 if let Some(row) = row {
586 Ok(Some(self.row_to_metadata(row)?))
587 } else {
588 Ok(None)
589 }
590 }
591
592 async fn get_expired_backups(&self, retention_days: u32) -> Result<Vec<BackupMetadata>> {
593 let rows = sqlx::query(
594 r#"
595 SELECT id, backup_type, status, start_time, end_time,
596 size_bytes, compressed_size_bytes, file_path, checksum,
597 database_name, wal_start_lsn, wal_end_lsn, encryption_enabled
598 FROM backup_metadata
599 WHERE start_time < NOW() - INTERVAL '%d days'
600 AND status != 'Expired'
601 "#,
602 )
603 .bind(retention_days as i32)
604 .fetch_all(self.db_pool.as_ref())
605 .await?;
606
607 let mut backups = Vec::new();
608 for row in rows {
609 backups.push(self.row_to_metadata(row)?);
610 }
611
612 Ok(backups)
613 }
614
615 async fn get_recent_backups(&self, days: u32) -> Result<Vec<BackupMetadata>> {
616 let rows = sqlx::query(
617 r#"
618 SELECT id, backup_type, status, start_time, end_time,
619 size_bytes, compressed_size_bytes, file_path, checksum,
620 database_name, wal_start_lsn, wal_end_lsn, encryption_enabled
621 FROM backup_metadata
622 WHERE start_time > NOW() - INTERVAL '%d days'
623 ORDER BY start_time DESC
624 "#,
625 )
626 .bind(days as i32)
627 .fetch_all(self.db_pool.as_ref())
628 .await?;
629
630 let mut backups = Vec::new();
631 for row in rows {
632 backups.push(self.row_to_metadata(row)?);
633 }
634
635 Ok(backups)
636 }
637
638 async fn count_backups(&self) -> Result<u32> {
639 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM backup_metadata")
640 .fetch_one(self.db_pool.as_ref())
641 .await?;
642
643 Ok(count as u32)
644 }
645
646 async fn mark_backup_expired(&self, backup_id: &str) -> Result<()> {
647 sqlx::query("UPDATE backup_metadata SET status = 'Expired' WHERE id = $1")
648 .bind(backup_id)
649 .execute(self.db_pool.as_ref())
650 .await?;
651
652 Ok(())
653 }
654
655 fn row_to_metadata(&self, row: sqlx::postgres::PgRow) -> Result<BackupMetadata> {
656 use sqlx::Row;
657
658 let backup_type_str: String = row.try_get("backup_type")?;
659 let backup_type = match backup_type_str.as_str() {
660 "Full" => BackupType::Full,
661 "Incremental" => BackupType::Incremental,
662 "Differential" => BackupType::Differential,
663 "WalArchive" => BackupType::WalArchive,
664 _ => BackupType::Full,
665 };
666
667 let status_str: String = row.try_get("status")?;
668 let status = match status_str.as_str() {
669 "InProgress" => BackupStatus::InProgress,
670 "Completed" => BackupStatus::Completed,
671 "Failed" => BackupStatus::Failed,
672 "Expired" => BackupStatus::Expired,
673 "Archived" => BackupStatus::Archived,
674 _ => BackupStatus::Failed,
675 };
676
677 Ok(BackupMetadata {
678 id: row.try_get("id")?,
679 backup_type,
680 status,
681 start_time: row.try_get("start_time")?,
682 end_time: row.try_get("end_time")?,
683 size_bytes: row.try_get::<i64, _>("size_bytes")? as u64,
684 compressed_size_bytes: row.try_get::<i64, _>("compressed_size_bytes")? as u64,
685 file_path: std::path::PathBuf::from(row.try_get::<String, _>("file_path")?),
686 checksum: row.try_get("checksum")?,
687 database_name: row.try_get("database_name")?,
688 wal_start_lsn: row.try_get("wal_start_lsn")?,
689 wal_end_lsn: row.try_get("wal_end_lsn")?,
690 encryption_enabled: row.try_get("encryption_enabled")?,
691 replication_status: std::collections::HashMap::new(),
692 verification_status: None,
693 })
694 }
695}
696
697#[derive(Debug, Clone, Serialize, Deserialize)]
698pub struct BackupStatistics {
699 pub total_backups: u32,
700 pub successful_backups_last_7_days: u32,
701 pub failed_backups_last_7_days: u32,
702 pub total_backup_size_bytes: u64,
703 pub latest_backup_time: Option<DateTime<Utc>>,
704 pub backup_frequency_met: bool,
705 pub rto_target_minutes: u32,
706 pub rpo_target_minutes: u32,
707}
708
709#[cfg(test)]
710mod tests {
711 use super::*;
712
713 #[test]
714 fn test_backup_config_default() {
715 let config = BackupConfig::default();
716 assert_eq!(config.retention_days, 30);
717 assert_eq!(config.rto_minutes, 60);
718 assert_eq!(config.rpo_minutes, 5);
719 assert!(config.enable_encryption);
720 }
721
722 #[test]
723 fn test_backup_metadata_creation() {
724 let metadata = BackupMetadata {
725 id: "test-backup".to_string(),
726 backup_type: BackupType::Full,
727 status: BackupStatus::InProgress,
728 start_time: Utc::now(),
729 end_time: None,
730 size_bytes: 0,
731 compressed_size_bytes: 0,
732 file_path: std::path::PathBuf::from("/tmp/test.sql"),
733 checksum: String::new(),
734 database_name: "test_db".to_string(),
735 wal_start_lsn: None,
736 wal_end_lsn: None,
737 encryption_enabled: true,
738 replication_status: std::collections::HashMap::new(),
739 verification_status: None,
740 };
741
742 assert_eq!(metadata.id, "test-backup");
743 assert!(matches!(metadata.backup_type, BackupType::Full));
744 assert!(metadata.encryption_enabled);
745 }
746}