1use super::repository::{BackupRepository, PostgresBackupRepository};
2use super::{BackupConfig, BackupError, BackupMetadata, BackupStatus, BackupType, Result};
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::path::Path;
6use std::process::Command;
7use std::sync::Arc;
8use tokio::fs;
9use tracing::{debug, error, info, warn};
10use uuid::Uuid;
11
12pub struct BackupManager {
14 config: BackupConfig,
15 repository: Arc<dyn BackupRepository>,
16}
17
18impl BackupManager {
19 pub fn new(config: BackupConfig, db_pool: Arc<sqlx::PgPool>) -> Self {
20 let repository = Arc::new(PostgresBackupRepository::new(db_pool));
21 Self { config, repository }
22 }
23
24 pub fn with_repository(config: BackupConfig, repository: Arc<dyn BackupRepository>) -> Self {
25 Self { config, repository }
26 }
27
28 pub async fn initialize(&self) -> Result<()> {
30 info!("Initializing backup manager");
31
32 fs::create_dir_all(&self.config.backup_directory).await?;
34 fs::create_dir_all(&self.config.wal_archive_directory).await?;
35
36 self.repository.initialize().await?;
38
39 self.repository.verify_postgres_config().await?;
41
42 info!("Backup manager initialized successfully");
43 Ok(())
44 }
45
46 pub async fn create_full_backup(&self) -> Result<BackupMetadata> {
48 let backup_id = Uuid::new_v4().to_string();
49 let start_time = Utc::now();
50
51 info!("Starting full backup: {}", backup_id);
52
53 let backup_filename = format!(
54 "full_backup_{}_{}.sql.gz",
55 start_time.format("%Y%m%d_%H%M%S"),
56 backup_id
57 );
58 let backup_path = self.config.backup_directory.join(&backup_filename);
59
60 self.check_disk_space(&backup_path).await?;
62
63 let mut metadata = BackupMetadata {
64 id: backup_id.clone(),
65 backup_type: BackupType::Full,
66 status: BackupStatus::InProgress,
67 start_time,
68 end_time: None,
69 size_bytes: 0,
70 compressed_size_bytes: 0,
71 file_path: backup_path.clone(),
72 checksum: String::new(),
73 database_name: self.extract_database_name()?,
74 wal_start_lsn: None,
75 wal_end_lsn: None,
76 encryption_enabled: self.config.enable_encryption,
77 replication_status: std::collections::HashMap::new(),
78 verification_status: None,
79 };
80
81 self.repository.store_metadata(&metadata).await?;
83
84 let start_lsn = self.repository.get_current_wal_lsn().await?;
86 metadata.wal_start_lsn = Some(start_lsn);
87
88 match self
90 .execute_pg_dump(&backup_path, &metadata.database_name)
91 .await
92 {
93 Ok(_) => {
94 let end_time = Utc::now();
95 metadata.end_time = Some(end_time);
96 metadata.status = BackupStatus::Completed;
97
98 let end_lsn = self.repository.get_current_wal_lsn().await?;
100 metadata.wal_end_lsn = Some(end_lsn);
101
102 let file_metadata = fs::metadata(&backup_path).await?;
104 metadata.compressed_size_bytes = file_metadata.len();
105 metadata.checksum = self.calculate_file_checksum(&backup_path).await?;
106
107 self.repository.update_metadata(&metadata).await?;
109
110 info!(
111 "Full backup completed successfully: {} ({} bytes)",
112 backup_id, metadata.compressed_size_bytes
113 );
114
115 if self.config.enable_replication {
117 self.replicate_backup(&metadata).await?;
118 }
119
120 Ok(metadata)
121 }
122 Err(e) => {
123 metadata.status = BackupStatus::Failed;
124 metadata.end_time = Some(Utc::now());
125 self.repository.update_metadata(&metadata).await?;
126
127 error!("Full backup failed: {}", e);
128 Err(e)
129 }
130 }
131 }
132
133 pub async fn create_incremental_backup(&self) -> Result<BackupMetadata> {
135 let backup_id = Uuid::new_v4().to_string();
136 let start_time = Utc::now();
137
138 info!("Starting incremental backup: {}", backup_id);
139
140 let last_backup = self.repository.get_latest_backup().await?;
142 let start_lsn = match last_backup {
143 Some(backup) => {
144 if let Some(lsn) = backup.wal_end_lsn {
145 lsn
146 } else {
147 warn!("Last backup has no end LSN, using current LSN");
148 self.repository
149 .get_current_wal_lsn()
150 .await
151 .unwrap_or_default()
152 }
153 }
154 None => {
155 return Err(BackupError::BackupFailed {
156 message: "No previous backup found for incremental backup".to_string(),
157 });
158 }
159 };
160
161 let backup_filename = format!(
162 "incremental_backup_{}_{}.tar.gz",
163 start_time.format("%Y%m%d_%H%M%S"),
164 backup_id
165 );
166 let backup_path = self.config.backup_directory.join(&backup_filename);
167
168 let mut metadata = BackupMetadata {
169 id: backup_id.clone(),
170 backup_type: BackupType::Incremental,
171 status: BackupStatus::InProgress,
172 start_time,
173 end_time: None,
174 size_bytes: 0,
175 compressed_size_bytes: 0,
176 file_path: backup_path.clone(),
177 checksum: String::new(),
178 database_name: self.extract_database_name()?,
179 wal_start_lsn: Some(start_lsn.clone()),
180 wal_end_lsn: None,
181 encryption_enabled: self.config.enable_encryption,
182 replication_status: std::collections::HashMap::new(),
183 verification_status: None,
184 };
185
186 self.repository.store_metadata(&metadata).await?;
188
189 match self.archive_wal_files(&start_lsn, &backup_path).await {
191 Ok(end_lsn) => {
192 let end_time = Utc::now();
193 metadata.end_time = Some(end_time);
194 metadata.status = BackupStatus::Completed;
195 metadata.wal_end_lsn = Some(end_lsn);
196
197 if backup_path.exists() {
199 let file_metadata = fs::metadata(&backup_path).await?;
200 metadata.compressed_size_bytes = file_metadata.len();
201 metadata.checksum = self.calculate_file_checksum(&backup_path).await?;
202 } else {
203 warn!("No new WAL files to archive since last backup");
204 }
205
206 self.repository.update_metadata(&metadata).await?;
208
209 info!("Incremental backup completed successfully: {}", backup_id);
210 Ok(metadata)
211 }
212 Err(e) => {
213 metadata.status = BackupStatus::Failed;
214 metadata.end_time = Some(Utc::now());
215 self.repository.update_metadata(&metadata).await?;
216
217 error!("Incremental backup failed: {}", e);
218 Err(e)
219 }
220 }
221 }
222
223 pub async fn cleanup_expired_backups(&self) -> Result<u32> {
225 info!(
226 "Starting backup cleanup with retention policy of {} days",
227 self.config.retention_days
228 );
229
230 let expired_backups = self
231 .repository
232 .get_expired_backups(self.config.retention_days)
233 .await?;
234 let mut cleanup_count = 0;
235
236 for backup in expired_backups {
237 match self.delete_backup(&backup).await {
238 Ok(_) => {
239 cleanup_count += 1;
240 info!("Deleted expired backup: {}", backup.id);
241 }
242 Err(e) => {
243 error!("Failed to delete expired backup {}: {}", backup.id, e);
244 }
245 }
246 }
247
248 info!("Cleanup completed: {} backups deleted", cleanup_count);
249 Ok(cleanup_count)
250 }
251
252 pub async fn get_backup_statistics(&self) -> Result<BackupStatistics> {
254 let total_backups = self.repository.count_backups().await?;
255 let recent_backups = self.repository.get_recent_backups(7).await?;
256
257 let successful_backups = recent_backups
258 .iter()
259 .filter(|b| b.status == BackupStatus::Completed)
260 .count();
261
262 let failed_backups = recent_backups
263 .iter()
264 .filter(|b| b.status == BackupStatus::Failed)
265 .count();
266
267 let total_backup_size = recent_backups
268 .iter()
269 .filter(|b| b.status == BackupStatus::Completed)
270 .map(|b| b.compressed_size_bytes)
271 .sum();
272
273 let latest_backup = self.repository.get_latest_backup().await?;
274 let backup_frequency_met = if let Some(latest) = &latest_backup {
275 let hours_since_last = Utc::now()
276 .signed_duration_since(latest.start_time)
277 .num_hours();
278 hours_since_last <= 24 } else {
280 false
281 };
282
283 Ok(BackupStatistics {
284 total_backups,
285 successful_backups_last_7_days: successful_backups as u32,
286 failed_backups_last_7_days: failed_backups as u32,
287 total_backup_size_bytes: total_backup_size,
288 latest_backup_time: latest_backup.map(|b| b.start_time),
289 backup_frequency_met,
290 rto_target_minutes: self.config.rto_minutes,
291 rpo_target_minutes: self.config.rpo_minutes,
292 })
293 }
294
295 fn extract_database_name(&self) -> Result<String> {
300 Ok("codex_memory".to_string())
303 }
304
305 async fn retry_database_operation<T, F, Fut>(&self, operation: F) -> Result<T>
309 where
310 F: Fn() -> Fut,
311 Fut: std::future::Future<Output = Result<T>>,
312 {
313 let mut retries = 0;
314 let max_retries = 3;
315
316 loop {
317 match operation().await {
318 Ok(result) => return Ok(result),
319 Err(e) => {
320 if retries >= max_retries {
321 return Err(e);
322 }
323
324 retries += 1;
325 let delay = std::time::Duration::from_millis(100 * (1 << retries));
326 warn!(
327 "Database operation failed (attempt {}), retrying in {:?}: {}",
328 retries, delay, e
329 );
330 tokio::time::sleep(delay).await;
331 }
332 }
333 }
334 }
335
336 async fn check_disk_space(&self, backup_path: &Path) -> Result<()> {
338 let backup_dir = backup_path.parent().unwrap_or_else(|| Path::new("/"));
339
340 let test_file = backup_dir.join(".backup_space_test");
342 match fs::write(&test_file, b"test").await {
343 Ok(_) => {
344 let _ = fs::remove_file(&test_file).await;
345 debug!("Disk space check passed for {}", backup_dir.display());
346 Ok(())
347 }
348 Err(e) => {
349 error!("Insufficient disk space or permissions for backup: {}", e);
350 Err(BackupError::BackupFailed {
351 message: format!("Cannot write to backup directory: {e}"),
352 })
353 }
354 }
355 }
356
357 async fn execute_pg_dump(&self, backup_path: &Path, database_name: &str) -> Result<()> {
358 debug!("Executing pg_dump to {}", backup_path.display());
359
360 let mut cmd = Command::new("pg_dump");
361 cmd.arg("--verbose")
362 .arg("--format=custom")
363 .arg("--compress=9")
364 .arg("--no-privileges")
365 .arg("--no-owner")
366 .arg("--dbname")
367 .arg(database_name)
368 .arg("--file")
369 .arg(backup_path);
370
371 cmd.arg("--host")
374 .arg("localhost")
375 .arg("--port")
376 .arg("5432")
377 .arg("--username")
378 .arg("postgres");
379
380 let output = cmd.output().map_err(|e| BackupError::BackupFailed {
381 message: format!("Failed to execute pg_dump: {e}"),
382 })?;
383
384 if !output.status.success() {
385 let error_msg = String::from_utf8_lossy(&output.stderr);
386 return Err(BackupError::BackupFailed {
387 message: format!("pg_dump failed: {error_msg}"),
388 });
389 }
390
391 debug!("pg_dump completed successfully");
392 Ok(())
393 }
394
395 async fn archive_wal_files(&self, start_lsn: &str, backup_path: &Path) -> Result<String> {
396 debug!("Archiving WAL files from LSN: {}", start_lsn);
397
398 let current_lsn = self.repository.get_current_wal_lsn().await?;
401
402 tokio::fs::write(backup_path, b"").await?;
404
405 Ok(current_lsn)
406 }
407
408 async fn calculate_file_checksum(&self, file_path: &Path) -> Result<String> {
409 use sha2::{Digest, Sha256};
410
411 let contents = fs::read(file_path).await?;
412 let mut hasher = Sha256::new();
413 hasher.update(&contents);
414 let result = hasher.finalize();
415 Ok(format!("{result:x}"))
416 }
417
418 async fn replicate_backup(&self, metadata: &BackupMetadata) -> Result<()> {
419 debug!("Replicating backup: {}", metadata.id);
420
421 for target in &self.config.replication_targets {
422 info!("Replicating to target: {}", target.name);
423 }
426
427 Ok(())
428 }
429
430 async fn delete_backup(&self, backup: &BackupMetadata) -> Result<()> {
431 debug!("Deleting backup: {}", backup.id);
432
433 if backup.file_path.exists() {
435 fs::remove_file(&backup.file_path).await?;
436 }
437
438 self.repository.mark_backup_expired(&backup.id).await?;
440
441 Ok(())
442 }
443}
444
445#[derive(Debug, Clone, Serialize, Deserialize)]
448pub struct BackupStatistics {
449 pub total_backups: u32,
450 pub successful_backups_last_7_days: u32,
451 pub failed_backups_last_7_days: u32,
452 pub total_backup_size_bytes: u64,
453 pub latest_backup_time: Option<DateTime<Utc>>,
454 pub backup_frequency_met: bool,
455 pub rto_target_minutes: u32,
456 pub rpo_target_minutes: u32,
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462
463 #[test]
464 fn test_backup_config_default() {
465 let config = BackupConfig::default();
466 assert_eq!(config.retention_days, 30);
467 assert_eq!(config.rto_minutes, 60);
468 assert_eq!(config.rpo_minutes, 5);
469 assert!(config.enable_encryption);
470 }
471
472 #[test]
473 fn test_backup_metadata_creation() {
474 let metadata = BackupMetadata {
475 id: "test-backup".to_string(),
476 backup_type: BackupType::Full,
477 status: BackupStatus::InProgress,
478 start_time: Utc::now(),
479 end_time: None,
480 size_bytes: 0,
481 compressed_size_bytes: 0,
482 file_path: std::path::PathBuf::from("/tmp/test.sql"),
483 checksum: String::new(),
484 database_name: "test_db".to_string(),
485 wal_start_lsn: None,
486 wal_end_lsn: None,
487 encryption_enabled: true,
488 replication_status: std::collections::HashMap::new(),
489 verification_status: None,
490 };
491
492 assert_eq!(metadata.id, "test-backup");
493 assert!(matches!(metadata.backup_type, BackupType::Full));
494 assert!(metadata.encryption_enabled);
495 }
496}