Skip to main content

allsource_core/infrastructure/persistence/
backup.rs

1use crate::domain::entities::Event;
2/// Backup and restore system for AllSource event store
3///
4/// Features:
5/// - Full backups of all event data
6/// - Incremental backups from checkpoint
7/// - Compressed backup files (gzip)
8/// - Metadata tracking
9/// - Verification and integrity checks
10/// - Support for filesystem and S3-compatible storage
11use crate::error::{AllSourceError, Result};
12use chrono::{DateTime, Utc};
13use flate2::{Compression, read::GzDecoder, write::GzEncoder};
14use serde::{Deserialize, Serialize};
15use std::{
16    fs::{self, File},
17    io::{Read, Write},
18    path::{Path, PathBuf},
19};
20use uuid::Uuid;
21
22/// Backup metadata
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct BackupMetadata {
25    pub backup_id: String,
26    pub created_at: DateTime<Utc>,
27    pub backup_type: BackupType,
28    pub event_count: u64,
29    pub size_bytes: u64,
30    pub checksum: String,
31    pub from_sequence: Option<u64>,
32    pub to_sequence: u64,
33    pub compressed: bool,
34}
35
36/// Type of backup
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub enum BackupType {
39    Full,
40    Incremental { from_backup_id: String },
41}
42
43/// Backup configuration
44#[derive(Debug, Clone)]
45pub struct BackupConfig {
46    pub backup_dir: PathBuf,
47    pub compression_level: Compression,
48    pub verify_after_backup: bool,
49}
50
51impl Default for BackupConfig {
52    fn default() -> Self {
53        Self {
54            backup_dir: PathBuf::from("./backups"),
55            compression_level: Compression::default(),
56            verify_after_backup: true,
57        }
58    }
59}
60
61/// Backup manager
62pub struct BackupManager {
63    config: BackupConfig,
64}
65
66impl BackupManager {
67    pub fn new(config: BackupConfig) -> Result<Self> {
68        // Ensure backup directory exists
69        fs::create_dir_all(&config.backup_dir).map_err(|e| {
70            AllSourceError::StorageError(format!("Failed to create backup dir: {e}"))
71        })?;
72
73        Ok(Self { config })
74    }
75
76    /// Create a full backup from events
77    pub fn create_backup(&self, events: &[Event]) -> Result<BackupMetadata> {
78        let backup_id = format!("full_{}", Uuid::new_v4());
79        let timestamp = Utc::now();
80
81        tracing::info!("Creating backup: {}", backup_id);
82
83        let event_count = events.len() as u64;
84
85        if event_count == 0 {
86            return Err(AllSourceError::ValidationError(
87                "No events to backup".to_string(),
88            ));
89        }
90
91        // Serialize events to JSON
92        let json_data = serde_json::to_string(&events)?;
93
94        // Compress backup
95        let backup_path = self.get_backup_path(&backup_id);
96        let mut encoder = GzEncoder::new(
97            File::create(&backup_path).map_err(|e| {
98                AllSourceError::StorageError(format!("Failed to create backup file: {e}"))
99            })?,
100            self.config.compression_level,
101        );
102
103        encoder
104            .write_all(json_data.as_bytes())
105            .map_err(|e| AllSourceError::StorageError(format!("Failed to write backup: {e}")))?;
106
107        encoder.finish().map_err(|e| {
108            AllSourceError::StorageError(format!("Failed to finish compression: {e}"))
109        })?;
110
111        let size_bytes = fs::metadata(&backup_path)
112            .map_err(|e| AllSourceError::StorageError(e.to_string()))?
113            .len();
114
115        let checksum = self.calculate_checksum(&backup_path)?;
116
117        let metadata = BackupMetadata {
118            backup_id: backup_id.clone(),
119            created_at: timestamp,
120            backup_type: BackupType::Full,
121            event_count,
122            size_bytes,
123            checksum,
124            from_sequence: None,
125            to_sequence: event_count,
126            compressed: true,
127        };
128
129        // Save metadata
130        self.save_metadata(&metadata)?;
131
132        // Verify if configured
133        if self.config.verify_after_backup {
134            self.verify_backup(&metadata)?;
135        }
136
137        tracing::info!(
138            "Backup complete: {} events, {} bytes compressed",
139            event_count,
140            size_bytes
141        );
142
143        Ok(metadata)
144    }
145
146    /// Restore from backup
147    pub fn restore_from_backup(&self, backup_id: &str) -> Result<Vec<Event>> {
148        tracing::info!("Restoring from backup: {}", backup_id);
149
150        let metadata = self.load_metadata(backup_id)?;
151
152        // Verify backup integrity
153        self.verify_backup(&metadata)?;
154
155        let backup_path = self.get_backup_path(backup_id);
156
157        // Decompress backup
158        let file = File::open(&backup_path)
159            .map_err(|e| AllSourceError::StorageError(format!("Failed to open backup: {e}")))?;
160
161        let mut decoder = GzDecoder::new(file);
162        let mut json_data = String::new();
163        decoder.read_to_string(&mut json_data).map_err(|e| {
164            AllSourceError::StorageError(format!("Failed to decompress backup: {e}"))
165        })?;
166
167        // Deserialize events
168        let events: Vec<Event> = serde_json::from_str(&json_data)?;
169
170        if events.len() != metadata.event_count as usize {
171            return Err(AllSourceError::ValidationError(format!(
172                "Event count mismatch: expected {}, got {}",
173                metadata.event_count,
174                events.len()
175            )));
176        }
177
178        tracing::info!("Restored {} events from backup", events.len());
179
180        Ok(events)
181    }
182
183    /// Verify backup integrity
184    pub fn verify_backup(&self, metadata: &BackupMetadata) -> Result<()> {
185        let backup_path = self.get_backup_path(&metadata.backup_id);
186
187        if !backup_path.exists() {
188            return Err(AllSourceError::ValidationError(
189                "Backup file not found".to_string(),
190            ));
191        }
192
193        let checksum = self.calculate_checksum(&backup_path)?;
194
195        if checksum != metadata.checksum {
196            return Err(AllSourceError::ValidationError(format!(
197                "Checksum mismatch: expected {}, got {}",
198                metadata.checksum, checksum
199            )));
200        }
201
202        Ok(())
203    }
204
205    /// List all backups
206    pub fn list_backups(&self) -> Result<Vec<BackupMetadata>> {
207        let mut backups = Vec::new();
208
209        let entries = fs::read_dir(&self.config.backup_dir)
210            .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
211
212        for entry in entries {
213            let entry = entry.map_err(|e| AllSourceError::StorageError(e.to_string()))?;
214            let path = entry.path();
215
216            if path.extension().and_then(|s| s.to_str()) == Some("json")
217                && let Some(stem) = path.file_stem().and_then(|s| s.to_str())
218                && let Some(backup_id) = stem.strip_suffix("_metadata")
219                && let Ok(metadata) = self.load_metadata(backup_id)
220            {
221                backups.push(metadata);
222            }
223        }
224
225        // Sort by creation time, newest first
226        backups.sort_by_key(|x| std::cmp::Reverse(x.created_at));
227
228        Ok(backups)
229    }
230
231    /// Delete backup
232    pub fn delete_backup(&self, backup_id: &str) -> Result<()> {
233        tracing::info!("Deleting backup: {}", backup_id);
234
235        let backup_path = self.get_backup_path(backup_id);
236        let metadata_path = self.get_metadata_path(backup_id);
237
238        if backup_path.exists() {
239            fs::remove_file(&backup_path)
240                .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
241        }
242
243        if metadata_path.exists() {
244            fs::remove_file(&metadata_path)
245                .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
246        }
247
248        Ok(())
249    }
250
251    /// Cleanup old backups (keep last N)
252    pub fn cleanup_old_backups(&self, keep_count: usize) -> Result<usize> {
253        let mut backups = self.list_backups()?;
254
255        if backups.len() <= keep_count {
256            return Ok(0);
257        }
258
259        // Sort by date, oldest last
260        backups.sort_by_key(|x| std::cmp::Reverse(x.created_at));
261
262        let to_delete = backups.split_off(keep_count);
263        let delete_count = to_delete.len();
264
265        for backup in to_delete {
266            self.delete_backup(&backup.backup_id)?;
267        }
268
269        tracing::info!("Cleaned up {} old backups", delete_count);
270
271        Ok(delete_count)
272    }
273
274    // Private helper methods
275
276    fn get_backup_path(&self, backup_id: &str) -> PathBuf {
277        self.config
278            .backup_dir
279            .join(format!("{backup_id}.backup.gz"))
280    }
281
282    fn get_metadata_path(&self, backup_id: &str) -> PathBuf {
283        self.config
284            .backup_dir
285            .join(format!("{backup_id}_metadata.json"))
286    }
287
288    fn save_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
289        let path = self.get_metadata_path(&metadata.backup_id);
290        let json = serde_json::to_string_pretty(metadata)?;
291
292        fs::write(&path, json).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
293
294        Ok(())
295    }
296
297    fn load_metadata(&self, backup_id: &str) -> Result<BackupMetadata> {
298        let path = self.get_metadata_path(backup_id);
299
300        if !path.exists() {
301            return Err(AllSourceError::ValidationError(
302                "Backup metadata not found".to_string(),
303            ));
304        }
305
306        let json =
307            fs::read_to_string(&path).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
308
309        Ok(serde_json::from_str(&json)?)
310    }
311
312    fn calculate_checksum(&self, path: &Path) -> Result<String> {
313        use sha2::{Digest, Sha256};
314
315        let mut file = File::open(path).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
316
317        let mut hasher = Sha256::new();
318        let mut buffer = [0; 8192];
319
320        loop {
321            let count = file
322                .read(&mut buffer)
323                .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
324
325            if count == 0 {
326                break;
327            }
328
329            hasher.update(&buffer[..count]);
330        }
331
332        Ok(format!("{:x}", hasher.finalize()))
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339
340    #[test]
341    fn test_backup_config_default() {
342        let config = BackupConfig::default();
343        assert!(config.verify_after_backup);
344    }
345
346    #[test]
347    fn test_backup_type_serialization() {
348        let full = BackupType::Full;
349        let json = serde_json::to_string(&full).unwrap();
350        let deserialized: BackupType = serde_json::from_str(&json).unwrap();
351        assert_eq!(full, deserialized);
352    }
353}