1use crate::storage::{ChunkStorage, PinnedContentInfo, StorageError};
10use chie_crypto::hash;
11use serde::{Deserialize, Serialize};
12use std::collections::HashSet;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
16use thiserror::Error;
17use tokio::fs;
18use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
19use tracing::{debug, info, warn};
20
21#[derive(Debug, Error)]
23pub enum BackupError {
24 #[error("IO error: {0}")]
25 IoError(#[from] std::io::Error),
26
27 #[error("Serialization error: {0}")]
28 SerializationError(String),
29
30 #[error("Storage error: {0}")]
31 StorageError(#[from] StorageError),
32
33 #[error("Backup not found: {path}")]
34 BackupNotFound { path: String },
35
36 #[error("Invalid backup format: {0}")]
37 InvalidFormat(String),
38
39 #[error("Checksum mismatch: expected {expected}, got {actual}")]
40 ChecksumMismatch { expected: String, actual: String },
41
42 #[error("Backup cancelled")]
43 Cancelled,
44
45 #[error("Incompatible backup version: {version}")]
46 IncompatibleVersion { version: u32 },
47}
48
49const BACKUP_VERSION: u32 = 1;
51
52#[derive(Debug, Clone)]
54pub struct BackupConfig {
55 pub compress: bool,
57 pub archive_chunk_size: usize,
59 pub verify_on_backup: bool,
61 pub verify_on_restore: bool,
63 pub include_metadata: bool,
65}
66
67impl Default for BackupConfig {
68 fn default() -> Self {
69 Self {
70 compress: true,
71 archive_chunk_size: 4 * 1024 * 1024, verify_on_backup: true,
73 verify_on_restore: true,
74 include_metadata: true,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct BackupManifest {
82 pub version: u32,
84 pub created_at: chrono::DateTime<chrono::Utc>,
86 pub backup_type: BackupType,
88 pub parent_backup_id: Option<String>,
90 pub backup_id: String,
92 pub content_items: Vec<BackupContentEntry>,
94 pub total_size: u64,
96 pub checksum: String,
98 pub source_path: String,
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
104pub enum BackupType {
105 Full,
107 Incremental,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct BackupContentEntry {
114 pub cid: String,
116 pub chunk_count: u64,
118 pub total_size: u64,
120 pub chunk_checksums: Vec<String>,
122 pub archive_offset: u64,
124}
125
126#[derive(Debug, Clone)]
128pub struct BackupProgress {
129 pub total_bytes: Arc<AtomicU64>,
131 pub processed_bytes: Arc<AtomicU64>,
133 pub total_items: Arc<AtomicU64>,
135 pub processed_items: Arc<AtomicU64>,
137 pub current_operation: Arc<std::sync::RwLock<String>>,
139 pub cancelled: Arc<AtomicBool>,
141}
142
143impl Default for BackupProgress {
144 fn default() -> Self {
145 Self::new()
146 }
147}
148
149impl BackupProgress {
150 #[must_use]
152 #[inline]
153 pub fn new() -> Self {
154 Self {
155 total_bytes: Arc::new(AtomicU64::new(0)),
156 processed_bytes: Arc::new(AtomicU64::new(0)),
157 total_items: Arc::new(AtomicU64::new(0)),
158 processed_items: Arc::new(AtomicU64::new(0)),
159 current_operation: Arc::new(std::sync::RwLock::new(String::new())),
160 cancelled: Arc::new(AtomicBool::new(false)),
161 }
162 }
163
164 #[must_use]
166 #[inline]
167 pub fn percentage(&self) -> f64 {
168 let total = self.total_bytes.load(Ordering::Relaxed);
169 if total == 0 {
170 return 0.0;
171 }
172 let processed = self.processed_bytes.load(Ordering::Relaxed);
173 (processed as f64 / total as f64) * 100.0
174 }
175
176 #[must_use]
178 #[inline]
179 pub fn is_cancelled(&self) -> bool {
180 self.cancelled.load(Ordering::Relaxed)
181 }
182
183 #[inline]
185 pub fn cancel(&self) {
186 self.cancelled.store(true, Ordering::Relaxed);
187 }
188
189 #[inline]
191 pub fn set_operation(&self, op: &str) {
192 if let Ok(mut guard) = self.current_operation.write() {
193 *guard = op.to_string();
194 }
195 }
196
197 #[inline]
199 pub fn add_bytes(&self, bytes: u64) {
200 self.processed_bytes.fetch_add(bytes, Ordering::Relaxed);
201 }
202
203 #[inline]
205 pub fn increment_items(&self) {
206 self.processed_items.fetch_add(1, Ordering::Relaxed);
207 }
208}
209
210#[derive(Debug, Clone)]
212pub struct BackupResult {
213 pub manifest: BackupManifest,
215 pub backup_path: PathBuf,
217 pub duration_secs: f64,
219 pub items_backed_up: usize,
221}
222
223#[derive(Debug, Clone)]
225pub struct RestoreResult {
226 pub items_restored: usize,
228 pub chunks_restored: u64,
230 pub bytes_restored: u64,
232 pub duration_secs: f64,
234 pub failed_items: Vec<String>,
236}
237
238pub struct BackupManager {
240 config: BackupConfig,
242}
243
244impl BackupManager {
245 #[must_use]
247 #[inline]
248 pub fn new(config: BackupConfig) -> Self {
249 Self { config }
250 }
251
252 pub async fn create_full_backup(
254 &self,
255 storage: &ChunkStorage,
256 backup_dir: &Path,
257 progress: Option<&BackupProgress>,
258 ) -> Result<BackupResult, BackupError> {
259 let start = std::time::Instant::now();
260
261 fs::create_dir_all(backup_dir).await?;
263
264 let backup_id = uuid::Uuid::new_v4().to_string();
265 let backup_path = backup_dir.join(format!("backup_{}.chie", backup_id));
266
267 info!("Creating full backup: {}", backup_id);
268
269 if let Some(p) = progress {
270 p.set_operation("Preparing backup");
271 }
272
273 let pinned_cids = storage.list_pinned();
275 let mut content_entries = Vec::new();
276 let mut total_size = 0u64;
277
278 if let Some(p) = progress {
279 p.total_items
280 .store(pinned_cids.len() as u64, Ordering::Relaxed);
281 }
282
283 for cid in &pinned_cids {
285 if let Some(info) = storage.get_pinned_info(cid) {
286 total_size += info.total_size;
287 }
288 }
289
290 if let Some(p) = progress {
291 p.total_bytes.store(total_size, Ordering::Relaxed);
292 }
293
294 let mut backup_file = fs::File::create(&backup_path).await?;
296 let mut archive_offset = 0u64;
297
298 let header_placeholder = vec![0u8; 1024];
300 backup_file.write_all(&header_placeholder).await?;
301 archive_offset += header_placeholder.len() as u64;
302
303 for cid in pinned_cids {
305 if let Some(p) = progress {
306 if p.is_cancelled() {
307 let _ = fs::remove_file(&backup_path).await;
309 return Err(BackupError::Cancelled);
310 }
311 p.set_operation(&format!("Backing up {}", cid));
312 }
313
314 let entry = self
315 .backup_content(
316 storage,
317 cid,
318 &mut backup_file,
319 &mut archive_offset,
320 progress,
321 )
322 .await?;
323
324 content_entries.push(entry);
325
326 if let Some(p) = progress {
327 p.increment_items();
328 }
329 }
330
331 let manifest_data = self.create_manifest_data(&content_entries)?;
333 let checksum = hex::encode(hash(&manifest_data));
334
335 let manifest = BackupManifest {
336 version: BACKUP_VERSION,
337 created_at: chrono::Utc::now(),
338 backup_type: BackupType::Full,
339 parent_backup_id: None,
340 backup_id: backup_id.clone(),
341 content_items: content_entries.clone(),
342 total_size,
343 checksum,
344 source_path: storage.base_path().to_string_lossy().to_string(),
345 };
346
347 let manifest_json = serde_json::to_vec_pretty(&manifest)
349 .map_err(|e| BackupError::SerializationError(e.to_string()))?;
350
351 backup_file.write_all(&manifest_json).await?;
352
353 let manifest_len = manifest_json.len() as u64;
355 backup_file.write_all(&manifest_len.to_le_bytes()).await?;
356
357 backup_file.flush().await?;
359 backup_file.sync_all().await?;
360
361 let duration = start.elapsed().as_secs_f64();
362
363 info!(
364 "Backup complete: {} items in {:.2}s",
365 content_entries.len(),
366 duration
367 );
368
369 Ok(BackupResult {
370 manifest,
371 backup_path,
372 duration_secs: duration,
373 items_backed_up: content_entries.len(),
374 })
375 }
376
377 pub async fn create_incremental_backup(
379 &self,
380 storage: &ChunkStorage,
381 backup_dir: &Path,
382 parent_manifest: &BackupManifest,
383 progress: Option<&BackupProgress>,
384 ) -> Result<BackupResult, BackupError> {
385 let start = std::time::Instant::now();
386
387 let backup_id = uuid::Uuid::new_v4().to_string();
388 let backup_path = backup_dir.join(format!("backup_{}_incr.chie", backup_id));
389
390 info!(
391 "Creating incremental backup: {} (parent: {})",
392 backup_id, parent_manifest.backup_id
393 );
394
395 if let Some(p) = progress {
396 p.set_operation("Analyzing changes");
397 }
398
399 let parent_cids: HashSet<_> = parent_manifest
401 .content_items
402 .iter()
403 .map(|e| e.cid.clone())
404 .collect();
405
406 let current_cids: HashSet<_> = storage
408 .list_pinned()
409 .into_iter()
410 .map(String::from)
411 .collect();
412 let new_cids: Vec<_> = current_cids.difference(&parent_cids).cloned().collect();
413
414 if new_cids.is_empty() {
415 info!("No changes detected, skipping backup");
416 return Ok(BackupResult {
417 manifest: BackupManifest {
418 version: BACKUP_VERSION,
419 created_at: chrono::Utc::now(),
420 backup_type: BackupType::Incremental,
421 parent_backup_id: Some(parent_manifest.backup_id.clone()),
422 backup_id,
423 content_items: vec![],
424 total_size: 0,
425 checksum: String::new(),
426 source_path: storage.base_path().to_string_lossy().to_string(),
427 },
428 backup_path,
429 duration_secs: start.elapsed().as_secs_f64(),
430 items_backed_up: 0,
431 });
432 }
433
434 fs::create_dir_all(backup_dir).await?;
436 let mut backup_file = fs::File::create(&backup_path).await?;
437 let mut archive_offset = 0u64;
438
439 let header_placeholder = vec![0u8; 1024];
441 backup_file.write_all(&header_placeholder).await?;
442 archive_offset += header_placeholder.len() as u64;
443
444 let mut content_entries = Vec::new();
445 let mut total_size = 0u64;
446
447 if let Some(p) = progress {
448 p.total_items
449 .store(new_cids.len() as u64, Ordering::Relaxed);
450 }
451
452 for cid in &new_cids {
454 if let Some(p) = progress {
455 if p.is_cancelled() {
456 let _ = fs::remove_file(&backup_path).await;
457 return Err(BackupError::Cancelled);
458 }
459 p.set_operation(&format!("Backing up {}", cid));
460 }
461
462 if let Some(info) = storage.get_pinned_info(cid) {
463 total_size += info.total_size;
464 }
465
466 let entry = self
467 .backup_content(
468 storage,
469 cid,
470 &mut backup_file,
471 &mut archive_offset,
472 progress,
473 )
474 .await?;
475
476 content_entries.push(entry);
477
478 if let Some(p) = progress {
479 p.increment_items();
480 }
481 }
482
483 let manifest_data = self.create_manifest_data(&content_entries)?;
485 let checksum = hex::encode(hash(&manifest_data));
486
487 let manifest = BackupManifest {
488 version: BACKUP_VERSION,
489 created_at: chrono::Utc::now(),
490 backup_type: BackupType::Incremental,
491 parent_backup_id: Some(parent_manifest.backup_id.clone()),
492 backup_id: backup_id.clone(),
493 content_items: content_entries.clone(),
494 total_size,
495 checksum,
496 source_path: storage.base_path().to_string_lossy().to_string(),
497 };
498
499 let manifest_json = serde_json::to_vec_pretty(&manifest)
501 .map_err(|e| BackupError::SerializationError(e.to_string()))?;
502 backup_file.write_all(&manifest_json).await?;
503
504 let manifest_len = manifest_json.len() as u64;
505 backup_file.write_all(&manifest_len.to_le_bytes()).await?;
506
507 backup_file.flush().await?;
508 backup_file.sync_all().await?;
509
510 let duration = start.elapsed().as_secs_f64();
511
512 info!(
513 "Incremental backup complete: {} items in {:.2}s",
514 content_entries.len(),
515 duration
516 );
517
518 Ok(BackupResult {
519 manifest,
520 backup_path,
521 duration_secs: duration,
522 items_backed_up: content_entries.len(),
523 })
524 }
525
526 pub async fn restore_backup(
528 &self,
529 backup_path: &Path,
530 storage: &mut ChunkStorage,
531 progress: Option<&BackupProgress>,
532 ) -> Result<RestoreResult, BackupError> {
533 let start = std::time::Instant::now();
534
535 if !backup_path.exists() {
536 return Err(BackupError::BackupNotFound {
537 path: backup_path.to_string_lossy().to_string(),
538 });
539 }
540
541 info!("Restoring from backup: {:?}", backup_path);
542
543 if let Some(p) = progress {
544 p.set_operation("Reading backup manifest");
545 }
546
547 let manifest = self.read_manifest(backup_path).await?;
549
550 if manifest.version != BACKUP_VERSION {
551 return Err(BackupError::IncompatibleVersion {
552 version: manifest.version,
553 });
554 }
555
556 if let Some(p) = progress {
557 p.total_items
558 .store(manifest.content_items.len() as u64, Ordering::Relaxed);
559 p.total_bytes.store(manifest.total_size, Ordering::Relaxed);
560 }
561
562 let mut items_restored = 0;
563 let mut chunks_restored = 0u64;
564 let mut bytes_restored = 0u64;
565 let mut failed_items = Vec::new();
566
567 let mut backup_file = fs::File::open(backup_path).await?;
569
570 for entry in &manifest.content_items {
572 if let Some(p) = progress {
573 if p.is_cancelled() {
574 return Err(BackupError::Cancelled);
575 }
576 p.set_operation(&format!("Restoring {}", entry.cid));
577 }
578
579 match self
580 .restore_content(entry, &mut backup_file, storage, progress)
581 .await
582 {
583 Ok((chunks, bytes)) => {
584 items_restored += 1;
585 chunks_restored += chunks;
586 bytes_restored += bytes;
587 }
588 Err(e) => {
589 warn!("Failed to restore {}: {}", entry.cid, e);
590 failed_items.push(entry.cid.clone());
591 }
592 }
593
594 if let Some(p) = progress {
595 p.increment_items();
596 }
597 }
598
599 let duration = start.elapsed().as_secs_f64();
600
601 info!(
602 "Restore complete: {} items, {} chunks, {} bytes in {:.2}s",
603 items_restored, chunks_restored, bytes_restored, duration
604 );
605
606 Ok(RestoreResult {
607 items_restored,
608 chunks_restored,
609 bytes_restored,
610 duration_secs: duration,
611 failed_items,
612 })
613 }
614
615 pub async fn read_manifest(&self, backup_path: &Path) -> Result<BackupManifest, BackupError> {
617 let mut file = fs::File::open(backup_path).await?;
618
619 let file_size = file.metadata().await?.len();
621 file.seek(std::io::SeekFrom::End(-8)).await?;
622
623 let mut len_bytes = [0u8; 8];
624 file.read_exact(&mut len_bytes).await?;
625 let manifest_len = u64::from_le_bytes(len_bytes) as usize;
626
627 let manifest_start = file_size - 8 - manifest_len as u64;
629 file.seek(std::io::SeekFrom::Start(manifest_start)).await?;
630
631 let mut manifest_data = vec![0u8; manifest_len];
632 file.read_exact(&mut manifest_data).await?;
633
634 let manifest: BackupManifest = serde_json::from_slice(&manifest_data)
635 .map_err(|e| BackupError::SerializationError(e.to_string()))?;
636
637 Ok(manifest)
638 }
639
640 pub async fn list_backups(
642 &self,
643 backup_dir: &Path,
644 ) -> Result<Vec<BackupManifest>, BackupError> {
645 let mut manifests = Vec::new();
646
647 if !backup_dir.exists() {
648 return Ok(manifests);
649 }
650
651 let mut entries = fs::read_dir(backup_dir).await?;
652 while let Some(entry) = entries.next_entry().await? {
653 let path = entry.path();
654 if path.extension().is_some_and(|ext| ext == "chie") {
655 match self.read_manifest(&path).await {
656 Ok(manifest) => manifests.push(manifest),
657 Err(e) => {
658 debug!("Skipping invalid backup {:?}: {}", path, e);
659 }
660 }
661 }
662 }
663
664 manifests.sort_by(|a, b| b.created_at.cmp(&a.created_at));
666
667 Ok(manifests)
668 }
669
670 pub async fn verify_backup(
672 &self,
673 backup_path: &Path,
674 progress: Option<&BackupProgress>,
675 ) -> Result<bool, BackupError> {
676 if let Some(p) = progress {
677 p.set_operation("Verifying backup integrity");
678 }
679
680 let manifest = self.read_manifest(backup_path).await?;
681
682 let manifest_data = self.create_manifest_data(&manifest.content_items)?;
684 let computed_checksum = hex::encode(hash(&manifest_data));
685
686 if computed_checksum != manifest.checksum {
687 return Ok(false);
688 }
689
690 let mut file = fs::File::open(backup_path).await?;
692
693 if let Some(p) = progress {
694 p.total_items
695 .store(manifest.content_items.len() as u64, Ordering::Relaxed);
696 }
697
698 for entry in &manifest.content_items {
699 if let Some(p) = progress {
700 if p.is_cancelled() {
701 return Err(BackupError::Cancelled);
702 }
703 p.set_operation(&format!("Verifying {}", entry.cid));
704 }
705
706 file.seek(std::io::SeekFrom::Start(entry.archive_offset))
708 .await?;
709
710 let mut count_bytes = [0u8; 8];
712 file.read_exact(&mut count_bytes).await?;
713 let stored_count = u64::from_le_bytes(count_bytes);
714
715 if stored_count != entry.chunk_count {
716 return Ok(false);
717 }
718
719 if let Some(p) = progress {
720 p.increment_items();
721 }
722 }
723
724 Ok(true)
725 }
726
727 async fn backup_content(
730 &self,
731 storage: &ChunkStorage,
732 cid: &str,
733 backup_file: &mut fs::File,
734 archive_offset: &mut u64,
735 progress: Option<&BackupProgress>,
736 ) -> Result<BackupContentEntry, BackupError> {
737 let info = storage
738 .get_pinned_info(cid)
739 .ok_or(StorageError::ContentNotFound {
740 cid: cid.to_string(),
741 })?;
742
743 let entry_offset = *archive_offset;
744 let mut chunk_checksums = Vec::new();
745
746 let count_bytes = info.chunk_count.to_le_bytes();
748 backup_file.write_all(&count_bytes).await?;
749 *archive_offset += 8;
750
751 let meta_json =
753 serde_json::to_vec(info).map_err(|e| BackupError::SerializationError(e.to_string()))?;
754 let meta_len = meta_json.len() as u32;
755 backup_file.write_all(&meta_len.to_le_bytes()).await?;
756 backup_file.write_all(&meta_json).await?;
757 *archive_offset += 4 + meta_json.len() as u64;
758
759 for chunk_idx in 0..info.chunk_count {
761 let chunk_data = storage.get_chunk(cid, chunk_idx).await?;
762 let chunk_hash = hash(&chunk_data);
763 chunk_checksums.push(hex::encode(chunk_hash));
764
765 let chunk_len = chunk_data.len() as u32;
767 backup_file.write_all(&chunk_len.to_le_bytes()).await?;
768 backup_file.write_all(&chunk_data).await?;
769 *archive_offset += 4 + chunk_data.len() as u64;
770
771 if let Some(p) = progress {
772 p.add_bytes(chunk_data.len() as u64);
773 }
774 }
775
776 Ok(BackupContentEntry {
777 cid: cid.to_string(),
778 chunk_count: info.chunk_count,
779 total_size: info.total_size,
780 chunk_checksums,
781 archive_offset: entry_offset,
782 })
783 }
784
785 async fn restore_content(
786 &self,
787 entry: &BackupContentEntry,
788 backup_file: &mut fs::File,
789 storage: &mut ChunkStorage,
790 progress: Option<&BackupProgress>,
791 ) -> Result<(u64, u64), BackupError> {
792 backup_file
794 .seek(std::io::SeekFrom::Start(entry.archive_offset))
795 .await?;
796
797 let mut count_bytes = [0u8; 8];
799 backup_file.read_exact(&mut count_bytes).await?;
800 let chunk_count = u64::from_le_bytes(count_bytes);
801
802 if chunk_count != entry.chunk_count {
803 return Err(BackupError::InvalidFormat(format!(
804 "Chunk count mismatch for {}: expected {}, got {}",
805 entry.cid, entry.chunk_count, chunk_count
806 )));
807 }
808
809 let mut meta_len_bytes = [0u8; 4];
811 backup_file.read_exact(&mut meta_len_bytes).await?;
812 let meta_len = u32::from_le_bytes(meta_len_bytes) as usize;
813
814 let mut meta_data = vec![0u8; meta_len];
815 backup_file.read_exact(&mut meta_data).await?;
816
817 let content_info: PinnedContentInfo = serde_json::from_slice(&meta_data)
818 .map_err(|e| BackupError::SerializationError(e.to_string()))?;
819
820 let mut chunks = Vec::new();
822 let mut total_bytes = 0u64;
823
824 for (idx, expected_checksum) in entry.chunk_checksums.iter().enumerate() {
825 let mut chunk_len_bytes = [0u8; 4];
826 backup_file.read_exact(&mut chunk_len_bytes).await?;
827 let chunk_len = u32::from_le_bytes(chunk_len_bytes) as usize;
828
829 let mut chunk_data = vec![0u8; chunk_len];
830 backup_file.read_exact(&mut chunk_data).await?;
831
832 if self.config.verify_on_restore {
834 let actual_checksum = hex::encode(hash(&chunk_data));
835 if &actual_checksum != expected_checksum {
836 return Err(BackupError::ChecksumMismatch {
837 expected: expected_checksum.clone(),
838 actual: actual_checksum,
839 });
840 }
841 }
842
843 total_bytes += chunk_data.len() as u64;
844 chunks.push(chunk_data);
845
846 if let Some(p) = progress {
847 p.add_bytes(chunk_len as u64);
848 }
849
850 debug!(
851 "Restored chunk {}/{} for {}",
852 idx + 1,
853 chunk_count,
854 entry.cid
855 );
856 }
857
858 storage
860 .pin_content(
861 &entry.cid,
862 &chunks,
863 &content_info.encryption_key,
864 &content_info.base_nonce,
865 )
866 .await?;
867
868 Ok((chunk_count, total_bytes))
869 }
870
871 fn create_manifest_data(&self, entries: &[BackupContentEntry]) -> Result<Vec<u8>, BackupError> {
872 let mut data = Vec::new();
874 for entry in entries {
875 data.extend_from_slice(entry.cid.as_bytes());
876 data.extend_from_slice(&entry.chunk_count.to_le_bytes());
877 data.extend_from_slice(&entry.total_size.to_le_bytes());
878 for checksum in &entry.chunk_checksums {
879 data.extend_from_slice(checksum.as_bytes());
880 }
881 }
882 Ok(data)
883 }
884}
885
886#[derive(Debug, Clone)]
888pub struct RetentionPolicy {
889 pub min_retention_days: u32,
891 pub max_full_backups: usize,
893 pub max_incremental_per_full: usize,
895}
896
897impl Default for RetentionPolicy {
898 fn default() -> Self {
899 Self {
900 min_retention_days: 30,
901 max_full_backups: 5,
902 max_incremental_per_full: 10,
903 }
904 }
905}
906
907pub async fn apply_retention_policy(
909 backup_dir: &Path,
910 policy: &RetentionPolicy,
911) -> Result<Vec<PathBuf>, BackupError> {
912 let manager = BackupManager::new(BackupConfig::default());
913 let manifests = manager.list_backups(backup_dir).await?;
914
915 let mut to_delete = Vec::new();
916 let now = chrono::Utc::now();
917 let min_age = chrono::Duration::days(policy.min_retention_days as i64);
918
919 let mut full_backups: Vec<_> = manifests
921 .iter()
922 .filter(|m| m.backup_type == BackupType::Full)
923 .collect();
924
925 if full_backups.len() > policy.max_full_backups {
927 for manifest in full_backups.drain(policy.max_full_backups..) {
928 if now - manifest.created_at > min_age {
929 to_delete.push(backup_dir.join(format!("backup_{}.chie", manifest.backup_id)));
930 }
931 }
932 }
933
934 for full_manifest in &full_backups {
936 let incrementals: Vec<_> = manifests
937 .iter()
938 .filter(|m| {
939 m.backup_type == BackupType::Incremental
940 && m.parent_backup_id.as_ref() == Some(&full_manifest.backup_id)
941 })
942 .collect();
943
944 if incrementals.len() > policy.max_incremental_per_full {
945 for manifest in incrementals.iter().skip(policy.max_incremental_per_full) {
946 if now - manifest.created_at > min_age {
947 to_delete
948 .push(backup_dir.join(format!("backup_{}_incr.chie", manifest.backup_id)));
949 }
950 }
951 }
952 }
953
954 for path in &to_delete {
956 if path.exists() {
957 fs::remove_file(path).await?;
958 info!("Deleted old backup: {:?}", path);
959 }
960 }
961
962 Ok(to_delete)
963}
964
965#[cfg(test)]
966mod tests {
967 use super::*;
968 use tempfile::tempdir;
969
970 #[tokio::test]
971 async fn test_backup_config_default() {
972 let config = BackupConfig::default();
973 assert!(config.compress);
974 assert!(config.verify_on_backup);
975 assert!(config.verify_on_restore);
976 }
977
978 #[tokio::test]
979 async fn test_progress_tracking() {
980 let progress = BackupProgress::new();
981 progress.total_bytes.store(100, Ordering::Relaxed);
982 progress.processed_bytes.store(50, Ordering::Relaxed);
983
984 assert!((progress.percentage() - 50.0).abs() < 0.01);
985
986 progress.cancel();
987 assert!(progress.is_cancelled());
988 }
989
990 #[tokio::test]
991 async fn test_retention_policy_default() {
992 let policy = RetentionPolicy::default();
993 assert_eq!(policy.min_retention_days, 30);
994 assert_eq!(policy.max_full_backups, 5);
995 assert_eq!(policy.max_incremental_per_full, 10);
996 }
997
998 #[tokio::test]
999 async fn test_list_empty_backups() {
1000 let tmp = tempdir().unwrap();
1001 let manager = BackupManager::new(BackupConfig::default());
1002 let backups = manager.list_backups(tmp.path()).await.unwrap();
1003 assert!(backups.is_empty());
1004 }
1005}