chie_core/
backup.rs

1//! Backup and restore functionality for CHIE storage.
2//!
3//! This module provides:
4//! - Full backup creation
5//! - Incremental backup support
6//! - Backup restoration with integrity verification
7//! - Progress tracking for long operations
8
9use crate::storage::{ChunkStorage, PinnedContentInfo, StorageError};
10use chie_crypto::hash;
11use serde::{Deserialize, Serialize};
12use std::collections::HashSet;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
16use thiserror::Error;
17use tokio::fs;
18use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
19use tracing::{debug, info, warn};
20
21/// Backup-related errors.
22#[derive(Debug, Error)]
23pub enum BackupError {
24    #[error("IO error: {0}")]
25    IoError(#[from] std::io::Error),
26
27    #[error("Serialization error: {0}")]
28    SerializationError(String),
29
30    #[error("Storage error: {0}")]
31    StorageError(#[from] StorageError),
32
33    #[error("Backup not found: {path}")]
34    BackupNotFound { path: String },
35
36    #[error("Invalid backup format: {0}")]
37    InvalidFormat(String),
38
39    #[error("Checksum mismatch: expected {expected}, got {actual}")]
40    ChecksumMismatch { expected: String, actual: String },
41
42    #[error("Backup cancelled")]
43    Cancelled,
44
45    #[error("Incompatible backup version: {version}")]
46    IncompatibleVersion { version: u32 },
47}
48
49/// Current backup format version.
50const BACKUP_VERSION: u32 = 1;
51
52/// Backup configuration.
53#[derive(Debug, Clone)]
54pub struct BackupConfig {
55    /// Enable compression for backup files.
56    pub compress: bool,
57    /// Chunk size for backup archive (default 4MB).
58    pub archive_chunk_size: usize,
59    /// Verify checksums during backup.
60    pub verify_on_backup: bool,
61    /// Verify checksums during restore.
62    pub verify_on_restore: bool,
63    /// Include metadata in backups.
64    pub include_metadata: bool,
65}
66
67impl Default for BackupConfig {
68    fn default() -> Self {
69        Self {
70            compress: true,
71            archive_chunk_size: 4 * 1024 * 1024, // 4MB
72            verify_on_backup: true,
73            verify_on_restore: true,
74            include_metadata: true,
75        }
76    }
77}
78
79/// Backup manifest describing the backup contents.
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct BackupManifest {
82    /// Backup format version.
83    pub version: u32,
84    /// When the backup was created.
85    pub created_at: chrono::DateTime<chrono::Utc>,
86    /// Type of backup.
87    pub backup_type: BackupType,
88    /// Previous backup ID for incremental backups.
89    pub parent_backup_id: Option<String>,
90    /// Unique backup identifier.
91    pub backup_id: String,
92    /// Content items included.
93    pub content_items: Vec<BackupContentEntry>,
94    /// Total size of backup data.
95    pub total_size: u64,
96    /// Checksum of the backup data.
97    pub checksum: String,
98    /// Source storage path.
99    pub source_path: String,
100}
101
102/// Type of backup.
103#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
104pub enum BackupType {
105    /// Full backup of all content.
106    Full,
107    /// Incremental backup (changes since last backup).
108    Incremental,
109}
110
111/// Entry for each content item in the backup.
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct BackupContentEntry {
114    /// Content CID.
115    pub cid: String,
116    /// Number of chunks.
117    pub chunk_count: u64,
118    /// Total size.
119    pub total_size: u64,
120    /// Chunk checksums for verification.
121    pub chunk_checksums: Vec<String>,
122    /// Offset in the backup archive.
123    pub archive_offset: u64,
124}
125
126/// Progress tracking for backup/restore operations.
127#[derive(Debug, Clone)]
128pub struct BackupProgress {
129    /// Total bytes to process.
130    pub total_bytes: Arc<AtomicU64>,
131    /// Bytes processed so far.
132    pub processed_bytes: Arc<AtomicU64>,
133    /// Total items to process.
134    pub total_items: Arc<AtomicU64>,
135    /// Items processed so far.
136    pub processed_items: Arc<AtomicU64>,
137    /// Current operation description.
138    pub current_operation: Arc<std::sync::RwLock<String>>,
139    /// Cancellation flag.
140    pub cancelled: Arc<AtomicBool>,
141}
142
143impl Default for BackupProgress {
144    fn default() -> Self {
145        Self::new()
146    }
147}
148
149impl BackupProgress {
150    /// Create a new progress tracker.
151    #[must_use]
152    #[inline]
153    pub fn new() -> Self {
154        Self {
155            total_bytes: Arc::new(AtomicU64::new(0)),
156            processed_bytes: Arc::new(AtomicU64::new(0)),
157            total_items: Arc::new(AtomicU64::new(0)),
158            processed_items: Arc::new(AtomicU64::new(0)),
159            current_operation: Arc::new(std::sync::RwLock::new(String::new())),
160            cancelled: Arc::new(AtomicBool::new(false)),
161        }
162    }
163
164    /// Get progress percentage (0-100).
165    #[must_use]
166    #[inline]
167    pub fn percentage(&self) -> f64 {
168        let total = self.total_bytes.load(Ordering::Relaxed);
169        if total == 0 {
170            return 0.0;
171        }
172        let processed = self.processed_bytes.load(Ordering::Relaxed);
173        (processed as f64 / total as f64) * 100.0
174    }
175
176    /// Check if operation is cancelled.
177    #[must_use]
178    #[inline]
179    pub fn is_cancelled(&self) -> bool {
180        self.cancelled.load(Ordering::Relaxed)
181    }
182
183    /// Cancel the operation.
184    #[inline]
185    pub fn cancel(&self) {
186        self.cancelled.store(true, Ordering::Relaxed);
187    }
188
189    /// Set current operation description.
190    #[inline]
191    pub fn set_operation(&self, op: &str) {
192        if let Ok(mut guard) = self.current_operation.write() {
193            *guard = op.to_string();
194        }
195    }
196
197    /// Add processed bytes.
198    #[inline]
199    pub fn add_bytes(&self, bytes: u64) {
200        self.processed_bytes.fetch_add(bytes, Ordering::Relaxed);
201    }
202
203    /// Increment processed items.
204    #[inline]
205    pub fn increment_items(&self) {
206        self.processed_items.fetch_add(1, Ordering::Relaxed);
207    }
208}
209
210/// Result of a backup operation.
211#[derive(Debug, Clone)]
212pub struct BackupResult {
213    /// Backup manifest.
214    pub manifest: BackupManifest,
215    /// Path to the backup file.
216    pub backup_path: PathBuf,
217    /// Duration of the backup.
218    pub duration_secs: f64,
219    /// Number of content items backed up.
220    pub items_backed_up: usize,
221}
222
223/// Result of a restore operation.
224#[derive(Debug, Clone)]
225pub struct RestoreResult {
226    /// Number of content items restored.
227    pub items_restored: usize,
228    /// Number of chunks restored.
229    pub chunks_restored: u64,
230    /// Total bytes restored.
231    pub bytes_restored: u64,
232    /// Duration of the restore.
233    pub duration_secs: f64,
234    /// Items that failed to restore.
235    pub failed_items: Vec<String>,
236}
237
238/// Backup manager for creating and restoring backups.
239pub struct BackupManager {
240    /// Backup configuration.
241    config: BackupConfig,
242}
243
244impl BackupManager {
245    /// Create a new backup manager.
246    #[must_use]
247    #[inline]
248    pub fn new(config: BackupConfig) -> Self {
249        Self { config }
250    }
251
252    /// Create a full backup of the storage.
253    pub async fn create_full_backup(
254        &self,
255        storage: &ChunkStorage,
256        backup_dir: &Path,
257        progress: Option<&BackupProgress>,
258    ) -> Result<BackupResult, BackupError> {
259        let start = std::time::Instant::now();
260
261        // Create backup directory
262        fs::create_dir_all(backup_dir).await?;
263
264        let backup_id = uuid::Uuid::new_v4().to_string();
265        let backup_path = backup_dir.join(format!("backup_{}.chie", backup_id));
266
267        info!("Creating full backup: {}", backup_id);
268
269        if let Some(p) = progress {
270            p.set_operation("Preparing backup");
271        }
272
273        // Collect content to backup
274        let pinned_cids = storage.list_pinned();
275        let mut content_entries = Vec::new();
276        let mut total_size = 0u64;
277
278        if let Some(p) = progress {
279            p.total_items
280                .store(pinned_cids.len() as u64, Ordering::Relaxed);
281        }
282
283        // Calculate total size for progress
284        for cid in &pinned_cids {
285            if let Some(info) = storage.get_pinned_info(cid) {
286                total_size += info.total_size;
287            }
288        }
289
290        if let Some(p) = progress {
291            p.total_bytes.store(total_size, Ordering::Relaxed);
292        }
293
294        // Create backup file
295        let mut backup_file = fs::File::create(&backup_path).await?;
296        let mut archive_offset = 0u64;
297
298        // Write header placeholder (will update later)
299        let header_placeholder = vec![0u8; 1024];
300        backup_file.write_all(&header_placeholder).await?;
301        archive_offset += header_placeholder.len() as u64;
302
303        // Backup each content item
304        for cid in pinned_cids {
305            if let Some(p) = progress {
306                if p.is_cancelled() {
307                    // Clean up partial backup
308                    let _ = fs::remove_file(&backup_path).await;
309                    return Err(BackupError::Cancelled);
310                }
311                p.set_operation(&format!("Backing up {}", cid));
312            }
313
314            let entry = self
315                .backup_content(
316                    storage,
317                    cid,
318                    &mut backup_file,
319                    &mut archive_offset,
320                    progress,
321                )
322                .await?;
323
324            content_entries.push(entry);
325
326            if let Some(p) = progress {
327                p.increment_items();
328            }
329        }
330
331        // Create manifest
332        let manifest_data = self.create_manifest_data(&content_entries)?;
333        let checksum = hex::encode(hash(&manifest_data));
334
335        let manifest = BackupManifest {
336            version: BACKUP_VERSION,
337            created_at: chrono::Utc::now(),
338            backup_type: BackupType::Full,
339            parent_backup_id: None,
340            backup_id: backup_id.clone(),
341            content_items: content_entries.clone(),
342            total_size,
343            checksum,
344            source_path: storage.base_path().to_string_lossy().to_string(),
345        };
346
347        // Write manifest at the end
348        let manifest_json = serde_json::to_vec_pretty(&manifest)
349            .map_err(|e| BackupError::SerializationError(e.to_string()))?;
350
351        backup_file.write_all(&manifest_json).await?;
352
353        // Write manifest length at the end
354        let manifest_len = manifest_json.len() as u64;
355        backup_file.write_all(&manifest_len.to_le_bytes()).await?;
356
357        // Flush and sync
358        backup_file.flush().await?;
359        backup_file.sync_all().await?;
360
361        let duration = start.elapsed().as_secs_f64();
362
363        info!(
364            "Backup complete: {} items in {:.2}s",
365            content_entries.len(),
366            duration
367        );
368
369        Ok(BackupResult {
370            manifest,
371            backup_path,
372            duration_secs: duration,
373            items_backed_up: content_entries.len(),
374        })
375    }
376
377    /// Create an incremental backup.
378    pub async fn create_incremental_backup(
379        &self,
380        storage: &ChunkStorage,
381        backup_dir: &Path,
382        parent_manifest: &BackupManifest,
383        progress: Option<&BackupProgress>,
384    ) -> Result<BackupResult, BackupError> {
385        let start = std::time::Instant::now();
386
387        let backup_id = uuid::Uuid::new_v4().to_string();
388        let backup_path = backup_dir.join(format!("backup_{}_incr.chie", backup_id));
389
390        info!(
391            "Creating incremental backup: {} (parent: {})",
392            backup_id, parent_manifest.backup_id
393        );
394
395        if let Some(p) = progress {
396            p.set_operation("Analyzing changes");
397        }
398
399        // Build set of existing content from parent
400        let parent_cids: HashSet<_> = parent_manifest
401            .content_items
402            .iter()
403            .map(|e| e.cid.clone())
404            .collect();
405
406        // Find new/changed content
407        let current_cids: HashSet<_> = storage
408            .list_pinned()
409            .into_iter()
410            .map(String::from)
411            .collect();
412        let new_cids: Vec<_> = current_cids.difference(&parent_cids).cloned().collect();
413
414        if new_cids.is_empty() {
415            info!("No changes detected, skipping backup");
416            return Ok(BackupResult {
417                manifest: BackupManifest {
418                    version: BACKUP_VERSION,
419                    created_at: chrono::Utc::now(),
420                    backup_type: BackupType::Incremental,
421                    parent_backup_id: Some(parent_manifest.backup_id.clone()),
422                    backup_id,
423                    content_items: vec![],
424                    total_size: 0,
425                    checksum: String::new(),
426                    source_path: storage.base_path().to_string_lossy().to_string(),
427                },
428                backup_path,
429                duration_secs: start.elapsed().as_secs_f64(),
430                items_backed_up: 0,
431            });
432        }
433
434        // Create backup file
435        fs::create_dir_all(backup_dir).await?;
436        let mut backup_file = fs::File::create(&backup_path).await?;
437        let mut archive_offset = 0u64;
438
439        // Write header placeholder
440        let header_placeholder = vec![0u8; 1024];
441        backup_file.write_all(&header_placeholder).await?;
442        archive_offset += header_placeholder.len() as u64;
443
444        let mut content_entries = Vec::new();
445        let mut total_size = 0u64;
446
447        if let Some(p) = progress {
448            p.total_items
449                .store(new_cids.len() as u64, Ordering::Relaxed);
450        }
451
452        // Backup only new content
453        for cid in &new_cids {
454            if let Some(p) = progress {
455                if p.is_cancelled() {
456                    let _ = fs::remove_file(&backup_path).await;
457                    return Err(BackupError::Cancelled);
458                }
459                p.set_operation(&format!("Backing up {}", cid));
460            }
461
462            if let Some(info) = storage.get_pinned_info(cid) {
463                total_size += info.total_size;
464            }
465
466            let entry = self
467                .backup_content(
468                    storage,
469                    cid,
470                    &mut backup_file,
471                    &mut archive_offset,
472                    progress,
473                )
474                .await?;
475
476            content_entries.push(entry);
477
478            if let Some(p) = progress {
479                p.increment_items();
480            }
481        }
482
483        // Create manifest
484        let manifest_data = self.create_manifest_data(&content_entries)?;
485        let checksum = hex::encode(hash(&manifest_data));
486
487        let manifest = BackupManifest {
488            version: BACKUP_VERSION,
489            created_at: chrono::Utc::now(),
490            backup_type: BackupType::Incremental,
491            parent_backup_id: Some(parent_manifest.backup_id.clone()),
492            backup_id: backup_id.clone(),
493            content_items: content_entries.clone(),
494            total_size,
495            checksum,
496            source_path: storage.base_path().to_string_lossy().to_string(),
497        };
498
499        // Write manifest
500        let manifest_json = serde_json::to_vec_pretty(&manifest)
501            .map_err(|e| BackupError::SerializationError(e.to_string()))?;
502        backup_file.write_all(&manifest_json).await?;
503
504        let manifest_len = manifest_json.len() as u64;
505        backup_file.write_all(&manifest_len.to_le_bytes()).await?;
506
507        backup_file.flush().await?;
508        backup_file.sync_all().await?;
509
510        let duration = start.elapsed().as_secs_f64();
511
512        info!(
513            "Incremental backup complete: {} items in {:.2}s",
514            content_entries.len(),
515            duration
516        );
517
518        Ok(BackupResult {
519            manifest,
520            backup_path,
521            duration_secs: duration,
522            items_backed_up: content_entries.len(),
523        })
524    }
525
526    /// Restore from a backup file.
527    pub async fn restore_backup(
528        &self,
529        backup_path: &Path,
530        storage: &mut ChunkStorage,
531        progress: Option<&BackupProgress>,
532    ) -> Result<RestoreResult, BackupError> {
533        let start = std::time::Instant::now();
534
535        if !backup_path.exists() {
536            return Err(BackupError::BackupNotFound {
537                path: backup_path.to_string_lossy().to_string(),
538            });
539        }
540
541        info!("Restoring from backup: {:?}", backup_path);
542
543        if let Some(p) = progress {
544            p.set_operation("Reading backup manifest");
545        }
546
547        // Read manifest from end of file
548        let manifest = self.read_manifest(backup_path).await?;
549
550        if manifest.version != BACKUP_VERSION {
551            return Err(BackupError::IncompatibleVersion {
552                version: manifest.version,
553            });
554        }
555
556        if let Some(p) = progress {
557            p.total_items
558                .store(manifest.content_items.len() as u64, Ordering::Relaxed);
559            p.total_bytes.store(manifest.total_size, Ordering::Relaxed);
560        }
561
562        let mut items_restored = 0;
563        let mut chunks_restored = 0u64;
564        let mut bytes_restored = 0u64;
565        let mut failed_items = Vec::new();
566
567        // Open backup file for reading
568        let mut backup_file = fs::File::open(backup_path).await?;
569
570        // Restore each content item
571        for entry in &manifest.content_items {
572            if let Some(p) = progress {
573                if p.is_cancelled() {
574                    return Err(BackupError::Cancelled);
575                }
576                p.set_operation(&format!("Restoring {}", entry.cid));
577            }
578
579            match self
580                .restore_content(entry, &mut backup_file, storage, progress)
581                .await
582            {
583                Ok((chunks, bytes)) => {
584                    items_restored += 1;
585                    chunks_restored += chunks;
586                    bytes_restored += bytes;
587                }
588                Err(e) => {
589                    warn!("Failed to restore {}: {}", entry.cid, e);
590                    failed_items.push(entry.cid.clone());
591                }
592            }
593
594            if let Some(p) = progress {
595                p.increment_items();
596            }
597        }
598
599        let duration = start.elapsed().as_secs_f64();
600
601        info!(
602            "Restore complete: {} items, {} chunks, {} bytes in {:.2}s",
603            items_restored, chunks_restored, bytes_restored, duration
604        );
605
606        Ok(RestoreResult {
607            items_restored,
608            chunks_restored,
609            bytes_restored,
610            duration_secs: duration,
611            failed_items,
612        })
613    }
614
615    /// Read backup manifest from a backup file.
616    pub async fn read_manifest(&self, backup_path: &Path) -> Result<BackupManifest, BackupError> {
617        let mut file = fs::File::open(backup_path).await?;
618
619        // Read manifest length from end
620        let file_size = file.metadata().await?.len();
621        file.seek(std::io::SeekFrom::End(-8)).await?;
622
623        let mut len_bytes = [0u8; 8];
624        file.read_exact(&mut len_bytes).await?;
625        let manifest_len = u64::from_le_bytes(len_bytes) as usize;
626
627        // Read manifest
628        let manifest_start = file_size - 8 - manifest_len as u64;
629        file.seek(std::io::SeekFrom::Start(manifest_start)).await?;
630
631        let mut manifest_data = vec![0u8; manifest_len];
632        file.read_exact(&mut manifest_data).await?;
633
634        let manifest: BackupManifest = serde_json::from_slice(&manifest_data)
635            .map_err(|e| BackupError::SerializationError(e.to_string()))?;
636
637        Ok(manifest)
638    }
639
640    /// List available backups in a directory.
641    pub async fn list_backups(
642        &self,
643        backup_dir: &Path,
644    ) -> Result<Vec<BackupManifest>, BackupError> {
645        let mut manifests = Vec::new();
646
647        if !backup_dir.exists() {
648            return Ok(manifests);
649        }
650
651        let mut entries = fs::read_dir(backup_dir).await?;
652        while let Some(entry) = entries.next_entry().await? {
653            let path = entry.path();
654            if path.extension().is_some_and(|ext| ext == "chie") {
655                match self.read_manifest(&path).await {
656                    Ok(manifest) => manifests.push(manifest),
657                    Err(e) => {
658                        debug!("Skipping invalid backup {:?}: {}", path, e);
659                    }
660                }
661            }
662        }
663
664        // Sort by creation time (newest first)
665        manifests.sort_by(|a, b| b.created_at.cmp(&a.created_at));
666
667        Ok(manifests)
668    }
669
670    /// Verify a backup file integrity.
671    pub async fn verify_backup(
672        &self,
673        backup_path: &Path,
674        progress: Option<&BackupProgress>,
675    ) -> Result<bool, BackupError> {
676        if let Some(p) = progress {
677            p.set_operation("Verifying backup integrity");
678        }
679
680        let manifest = self.read_manifest(backup_path).await?;
681
682        // Verify manifest checksum
683        let manifest_data = self.create_manifest_data(&manifest.content_items)?;
684        let computed_checksum = hex::encode(hash(&manifest_data));
685
686        if computed_checksum != manifest.checksum {
687            return Ok(false);
688        }
689
690        // Verify each content entry exists and has correct checksums
691        let mut file = fs::File::open(backup_path).await?;
692
693        if let Some(p) = progress {
694            p.total_items
695                .store(manifest.content_items.len() as u64, Ordering::Relaxed);
696        }
697
698        for entry in &manifest.content_items {
699            if let Some(p) = progress {
700                if p.is_cancelled() {
701                    return Err(BackupError::Cancelled);
702                }
703                p.set_operation(&format!("Verifying {}", entry.cid));
704            }
705
706            // Seek to entry offset and verify chunks exist
707            file.seek(std::io::SeekFrom::Start(entry.archive_offset))
708                .await?;
709
710            // Read and verify chunk count
711            let mut count_bytes = [0u8; 8];
712            file.read_exact(&mut count_bytes).await?;
713            let stored_count = u64::from_le_bytes(count_bytes);
714
715            if stored_count != entry.chunk_count {
716                return Ok(false);
717            }
718
719            if let Some(p) = progress {
720                p.increment_items();
721            }
722        }
723
724        Ok(true)
725    }
726
727    // Helper methods
728
729    async fn backup_content(
730        &self,
731        storage: &ChunkStorage,
732        cid: &str,
733        backup_file: &mut fs::File,
734        archive_offset: &mut u64,
735        progress: Option<&BackupProgress>,
736    ) -> Result<BackupContentEntry, BackupError> {
737        let info = storage
738            .get_pinned_info(cid)
739            .ok_or(StorageError::ContentNotFound {
740                cid: cid.to_string(),
741            })?;
742
743        let entry_offset = *archive_offset;
744        let mut chunk_checksums = Vec::new();
745
746        // Write chunk count
747        let count_bytes = info.chunk_count.to_le_bytes();
748        backup_file.write_all(&count_bytes).await?;
749        *archive_offset += 8;
750
751        // Write content metadata
752        let meta_json =
753            serde_json::to_vec(info).map_err(|e| BackupError::SerializationError(e.to_string()))?;
754        let meta_len = meta_json.len() as u32;
755        backup_file.write_all(&meta_len.to_le_bytes()).await?;
756        backup_file.write_all(&meta_json).await?;
757        *archive_offset += 4 + meta_json.len() as u64;
758
759        // Write each chunk
760        for chunk_idx in 0..info.chunk_count {
761            let chunk_data = storage.get_chunk(cid, chunk_idx).await?;
762            let chunk_hash = hash(&chunk_data);
763            chunk_checksums.push(hex::encode(chunk_hash));
764
765            // Write chunk length and data
766            let chunk_len = chunk_data.len() as u32;
767            backup_file.write_all(&chunk_len.to_le_bytes()).await?;
768            backup_file.write_all(&chunk_data).await?;
769            *archive_offset += 4 + chunk_data.len() as u64;
770
771            if let Some(p) = progress {
772                p.add_bytes(chunk_data.len() as u64);
773            }
774        }
775
776        Ok(BackupContentEntry {
777            cid: cid.to_string(),
778            chunk_count: info.chunk_count,
779            total_size: info.total_size,
780            chunk_checksums,
781            archive_offset: entry_offset,
782        })
783    }
784
785    async fn restore_content(
786        &self,
787        entry: &BackupContentEntry,
788        backup_file: &mut fs::File,
789        storage: &mut ChunkStorage,
790        progress: Option<&BackupProgress>,
791    ) -> Result<(u64, u64), BackupError> {
792        // Seek to entry offset
793        backup_file
794            .seek(std::io::SeekFrom::Start(entry.archive_offset))
795            .await?;
796
797        // Read chunk count
798        let mut count_bytes = [0u8; 8];
799        backup_file.read_exact(&mut count_bytes).await?;
800        let chunk_count = u64::from_le_bytes(count_bytes);
801
802        if chunk_count != entry.chunk_count {
803            return Err(BackupError::InvalidFormat(format!(
804                "Chunk count mismatch for {}: expected {}, got {}",
805                entry.cid, entry.chunk_count, chunk_count
806            )));
807        }
808
809        // Read content metadata
810        let mut meta_len_bytes = [0u8; 4];
811        backup_file.read_exact(&mut meta_len_bytes).await?;
812        let meta_len = u32::from_le_bytes(meta_len_bytes) as usize;
813
814        let mut meta_data = vec![0u8; meta_len];
815        backup_file.read_exact(&mut meta_data).await?;
816
817        let content_info: PinnedContentInfo = serde_json::from_slice(&meta_data)
818            .map_err(|e| BackupError::SerializationError(e.to_string()))?;
819
820        // Read all chunks
821        let mut chunks = Vec::new();
822        let mut total_bytes = 0u64;
823
824        for (idx, expected_checksum) in entry.chunk_checksums.iter().enumerate() {
825            let mut chunk_len_bytes = [0u8; 4];
826            backup_file.read_exact(&mut chunk_len_bytes).await?;
827            let chunk_len = u32::from_le_bytes(chunk_len_bytes) as usize;
828
829            let mut chunk_data = vec![0u8; chunk_len];
830            backup_file.read_exact(&mut chunk_data).await?;
831
832            // Verify checksum if enabled
833            if self.config.verify_on_restore {
834                let actual_checksum = hex::encode(hash(&chunk_data));
835                if &actual_checksum != expected_checksum {
836                    return Err(BackupError::ChecksumMismatch {
837                        expected: expected_checksum.clone(),
838                        actual: actual_checksum,
839                    });
840                }
841            }
842
843            total_bytes += chunk_data.len() as u64;
844            chunks.push(chunk_data);
845
846            if let Some(p) = progress {
847                p.add_bytes(chunk_len as u64);
848            }
849
850            debug!(
851                "Restored chunk {}/{} for {}",
852                idx + 1,
853                chunk_count,
854                entry.cid
855            );
856        }
857
858        // Pin the content in storage
859        storage
860            .pin_content(
861                &entry.cid,
862                &chunks,
863                &content_info.encryption_key,
864                &content_info.base_nonce,
865            )
866            .await?;
867
868        Ok((chunk_count, total_bytes))
869    }
870
871    fn create_manifest_data(&self, entries: &[BackupContentEntry]) -> Result<Vec<u8>, BackupError> {
872        // Create a deterministic representation for checksum
873        let mut data = Vec::new();
874        for entry in entries {
875            data.extend_from_slice(entry.cid.as_bytes());
876            data.extend_from_slice(&entry.chunk_count.to_le_bytes());
877            data.extend_from_slice(&entry.total_size.to_le_bytes());
878            for checksum in &entry.chunk_checksums {
879                data.extend_from_slice(checksum.as_bytes());
880            }
881        }
882        Ok(data)
883    }
884}
885
886/// Retention policy for managing backup history.
887#[derive(Debug, Clone)]
888pub struct RetentionPolicy {
889    /// Keep backups for at least this many days.
890    pub min_retention_days: u32,
891    /// Maximum number of full backups to keep.
892    pub max_full_backups: usize,
893    /// Maximum number of incremental backups per full backup.
894    pub max_incremental_per_full: usize,
895}
896
897impl Default for RetentionPolicy {
898    fn default() -> Self {
899        Self {
900            min_retention_days: 30,
901            max_full_backups: 5,
902            max_incremental_per_full: 10,
903        }
904    }
905}
906
907/// Apply retention policy to backup directory.
908pub async fn apply_retention_policy(
909    backup_dir: &Path,
910    policy: &RetentionPolicy,
911) -> Result<Vec<PathBuf>, BackupError> {
912    let manager = BackupManager::new(BackupConfig::default());
913    let manifests = manager.list_backups(backup_dir).await?;
914
915    let mut to_delete = Vec::new();
916    let now = chrono::Utc::now();
917    let min_age = chrono::Duration::days(policy.min_retention_days as i64);
918
919    // Group by full backup
920    let mut full_backups: Vec<_> = manifests
921        .iter()
922        .filter(|m| m.backup_type == BackupType::Full)
923        .collect();
924
925    // Keep only max_full_backups
926    if full_backups.len() > policy.max_full_backups {
927        for manifest in full_backups.drain(policy.max_full_backups..) {
928            if now - manifest.created_at > min_age {
929                to_delete.push(backup_dir.join(format!("backup_{}.chie", manifest.backup_id)));
930            }
931        }
932    }
933
934    // For each remaining full backup, limit incrementals
935    for full_manifest in &full_backups {
936        let incrementals: Vec<_> = manifests
937            .iter()
938            .filter(|m| {
939                m.backup_type == BackupType::Incremental
940                    && m.parent_backup_id.as_ref() == Some(&full_manifest.backup_id)
941            })
942            .collect();
943
944        if incrementals.len() > policy.max_incremental_per_full {
945            for manifest in incrementals.iter().skip(policy.max_incremental_per_full) {
946                if now - manifest.created_at > min_age {
947                    to_delete
948                        .push(backup_dir.join(format!("backup_{}_incr.chie", manifest.backup_id)));
949                }
950            }
951        }
952    }
953
954    // Delete old backups
955    for path in &to_delete {
956        if path.exists() {
957            fs::remove_file(path).await?;
958            info!("Deleted old backup: {:?}", path);
959        }
960    }
961
962    Ok(to_delete)
963}
964
965#[cfg(test)]
966mod tests {
967    use super::*;
968    use tempfile::tempdir;
969
970    #[tokio::test]
971    async fn test_backup_config_default() {
972        let config = BackupConfig::default();
973        assert!(config.compress);
974        assert!(config.verify_on_backup);
975        assert!(config.verify_on_restore);
976    }
977
978    #[tokio::test]
979    async fn test_progress_tracking() {
980        let progress = BackupProgress::new();
981        progress.total_bytes.store(100, Ordering::Relaxed);
982        progress.processed_bytes.store(50, Ordering::Relaxed);
983
984        assert!((progress.percentage() - 50.0).abs() < 0.01);
985
986        progress.cancel();
987        assert!(progress.is_cancelled());
988    }
989
990    #[tokio::test]
991    async fn test_retention_policy_default() {
992        let policy = RetentionPolicy::default();
993        assert_eq!(policy.min_retention_days, 30);
994        assert_eq!(policy.max_full_backups, 5);
995        assert_eq!(policy.max_incremental_per_full, 10);
996    }
997
998    #[tokio::test]
999    async fn test_list_empty_backups() {
1000        let tmp = tempdir().unwrap();
1001        let manager = BackupManager::new(BackupConfig::default());
1002        let backups = manager.list_backups(tmp.path()).await.unwrap();
1003        assert!(backups.is_empty());
1004    }
1005}