allsource_core/infrastructure/persistence/
backup.rs

1use crate::domain::entities::Event;
2/// Backup and restore system for AllSource event store
3///
4/// Features:
5/// - Full backups of all event data
6/// - Incremental backups from checkpoint
7/// - Compressed backup files (gzip)
8/// - Metadata tracking
9/// - Verification and integrity checks
10/// - Support for filesystem and S3-compatible storage
11use crate::error::{AllSourceError, Result};
12use chrono::{DateTime, Utc};
13use flate2::{read::GzDecoder, write::GzEncoder, Compression};
14use serde::{Deserialize, Serialize};
15use std::fs::{self, File};
16use std::io::{Read, Write};
17use std::path::{Path, PathBuf};
18use uuid::Uuid;
19
20/// Backup metadata
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct BackupMetadata {
23    pub backup_id: String,
24    pub created_at: DateTime<Utc>,
25    pub backup_type: BackupType,
26    pub event_count: u64,
27    pub size_bytes: u64,
28    pub checksum: String,
29    pub from_sequence: Option<u64>,
30    pub to_sequence: u64,
31    pub compressed: bool,
32}
33
34/// Type of backup
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36pub enum BackupType {
37    Full,
38    Incremental { from_backup_id: String },
39}
40
41/// Backup configuration
42#[derive(Debug, Clone)]
43pub struct BackupConfig {
44    pub backup_dir: PathBuf,
45    pub compression_level: Compression,
46    pub verify_after_backup: bool,
47}
48
49impl Default for BackupConfig {
50    fn default() -> Self {
51        Self {
52            backup_dir: PathBuf::from("./backups"),
53            compression_level: Compression::default(),
54            verify_after_backup: true,
55        }
56    }
57}
58
59/// Backup manager
60pub struct BackupManager {
61    config: BackupConfig,
62}
63
64impl BackupManager {
65    pub fn new(config: BackupConfig) -> Result<Self> {
66        // Ensure backup directory exists
67        fs::create_dir_all(&config.backup_dir).map_err(|e| {
68            AllSourceError::StorageError(format!("Failed to create backup dir: {}", e))
69        })?;
70
71        Ok(Self { config })
72    }
73
74    /// Create a full backup from events
75    pub fn create_backup(&self, events: &[Event]) -> Result<BackupMetadata> {
76        let backup_id = format!("full_{}", Uuid::new_v4());
77        let timestamp = Utc::now();
78
79        tracing::info!("Creating backup: {}", backup_id);
80
81        let event_count = events.len() as u64;
82
83        if event_count == 0 {
84            return Err(AllSourceError::ValidationError(
85                "No events to backup".to_string(),
86            ));
87        }
88
89        // Serialize events to JSON
90        let json_data = serde_json::to_string(&events)?;
91
92        // Compress backup
93        let backup_path = self.get_backup_path(&backup_id);
94        let mut encoder = GzEncoder::new(
95            File::create(&backup_path).map_err(|e| {
96                AllSourceError::StorageError(format!("Failed to create backup file: {}", e))
97            })?,
98            self.config.compression_level,
99        );
100
101        encoder
102            .write_all(json_data.as_bytes())
103            .map_err(|e| AllSourceError::StorageError(format!("Failed to write backup: {}", e)))?;
104
105        encoder.finish().map_err(|e| {
106            AllSourceError::StorageError(format!("Failed to finish compression: {}", e))
107        })?;
108
109        let size_bytes = fs::metadata(&backup_path)
110            .map_err(|e| AllSourceError::StorageError(e.to_string()))?
111            .len();
112
113        let checksum = self.calculate_checksum(&backup_path)?;
114
115        let metadata = BackupMetadata {
116            backup_id: backup_id.clone(),
117            created_at: timestamp,
118            backup_type: BackupType::Full,
119            event_count,
120            size_bytes,
121            checksum,
122            from_sequence: None,
123            to_sequence: event_count,
124            compressed: true,
125        };
126
127        // Save metadata
128        self.save_metadata(&metadata)?;
129
130        // Verify if configured
131        if self.config.verify_after_backup {
132            self.verify_backup(&metadata)?;
133        }
134
135        tracing::info!(
136            "Backup complete: {} events, {} bytes compressed",
137            event_count,
138            size_bytes
139        );
140
141        Ok(metadata)
142    }
143
144    /// Restore from backup
145    pub fn restore_from_backup(&self, backup_id: &str) -> Result<Vec<Event>> {
146        tracing::info!("Restoring from backup: {}", backup_id);
147
148        let metadata = self.load_metadata(backup_id)?;
149
150        // Verify backup integrity
151        self.verify_backup(&metadata)?;
152
153        let backup_path = self.get_backup_path(backup_id);
154
155        // Decompress backup
156        let file = File::open(&backup_path)
157            .map_err(|e| AllSourceError::StorageError(format!("Failed to open backup: {}", e)))?;
158
159        let mut decoder = GzDecoder::new(file);
160        let mut json_data = String::new();
161        decoder.read_to_string(&mut json_data).map_err(|e| {
162            AllSourceError::StorageError(format!("Failed to decompress backup: {}", e))
163        })?;
164
165        // Deserialize events
166        let events: Vec<Event> = serde_json::from_str(&json_data)?;
167
168        if events.len() != metadata.event_count as usize {
169            return Err(AllSourceError::ValidationError(format!(
170                "Event count mismatch: expected {}, got {}",
171                metadata.event_count,
172                events.len()
173            )));
174        }
175
176        tracing::info!("Restored {} events from backup", events.len());
177
178        Ok(events)
179    }
180
181    /// Verify backup integrity
182    pub fn verify_backup(&self, metadata: &BackupMetadata) -> Result<()> {
183        let backup_path = self.get_backup_path(&metadata.backup_id);
184
185        if !backup_path.exists() {
186            return Err(AllSourceError::ValidationError(
187                "Backup file not found".to_string(),
188            ));
189        }
190
191        let checksum = self.calculate_checksum(&backup_path)?;
192
193        if checksum != metadata.checksum {
194            return Err(AllSourceError::ValidationError(format!(
195                "Checksum mismatch: expected {}, got {}",
196                metadata.checksum, checksum
197            )));
198        }
199
200        Ok(())
201    }
202
203    /// List all backups
204    pub fn list_backups(&self) -> Result<Vec<BackupMetadata>> {
205        let mut backups = Vec::new();
206
207        let entries = fs::read_dir(&self.config.backup_dir)
208            .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
209
210        for entry in entries {
211            let entry = entry.map_err(|e| AllSourceError::StorageError(e.to_string()))?;
212            let path = entry.path();
213
214            if path.extension().and_then(|s| s.to_str()) == Some("json") {
215                if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
216                    if let Some(backup_id) = stem.strip_suffix("_metadata") {
217                        if let Ok(metadata) = self.load_metadata(backup_id) {
218                            backups.push(metadata);
219                        }
220                    }
221                }
222            }
223        }
224
225        // Sort by creation time, newest first
226        backups.sort_by(|a, b| b.created_at.cmp(&a.created_at));
227
228        Ok(backups)
229    }
230
231    /// Delete backup
232    pub fn delete_backup(&self, backup_id: &str) -> Result<()> {
233        tracing::info!("Deleting backup: {}", backup_id);
234
235        let backup_path = self.get_backup_path(backup_id);
236        let metadata_path = self.get_metadata_path(backup_id);
237
238        if backup_path.exists() {
239            fs::remove_file(&backup_path)
240                .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
241        }
242
243        if metadata_path.exists() {
244            fs::remove_file(&metadata_path)
245                .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
246        }
247
248        Ok(())
249    }
250
251    /// Cleanup old backups (keep last N)
252    pub fn cleanup_old_backups(&self, keep_count: usize) -> Result<usize> {
253        let mut backups = self.list_backups()?;
254
255        if backups.len() <= keep_count {
256            return Ok(0);
257        }
258
259        // Sort by date, oldest last
260        backups.sort_by(|a, b| b.created_at.cmp(&a.created_at));
261
262        let to_delete = backups.split_off(keep_count);
263        let delete_count = to_delete.len();
264
265        for backup in to_delete {
266            self.delete_backup(&backup.backup_id)?;
267        }
268
269        tracing::info!("Cleaned up {} old backups", delete_count);
270
271        Ok(delete_count)
272    }
273
274    // Private helper methods
275
276    fn get_backup_path(&self, backup_id: &str) -> PathBuf {
277        self.config
278            .backup_dir
279            .join(format!("{}.backup.gz", backup_id))
280    }
281
282    fn get_metadata_path(&self, backup_id: &str) -> PathBuf {
283        self.config
284            .backup_dir
285            .join(format!("{}_metadata.json", backup_id))
286    }
287
288    fn save_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
289        let path = self.get_metadata_path(&metadata.backup_id);
290        let json = serde_json::to_string_pretty(metadata)?;
291
292        fs::write(&path, json).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
293
294        Ok(())
295    }
296
297    fn load_metadata(&self, backup_id: &str) -> Result<BackupMetadata> {
298        let path = self.get_metadata_path(backup_id);
299
300        if !path.exists() {
301            return Err(AllSourceError::ValidationError(
302                "Backup metadata not found".to_string(),
303            ));
304        }
305
306        let json =
307            fs::read_to_string(&path).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
308
309        Ok(serde_json::from_str(&json)?)
310    }
311
312    fn calculate_checksum(&self, path: &Path) -> Result<String> {
313        use sha2::{Digest, Sha256};
314
315        let mut file = File::open(path).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
316
317        let mut hasher = Sha256::new();
318        let mut buffer = [0; 8192];
319
320        loop {
321            let count = file
322                .read(&mut buffer)
323                .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
324
325            if count == 0 {
326                break;
327            }
328
329            hasher.update(&buffer[..count]);
330        }
331
332        Ok(format!("{:x}", hasher.finalize()))
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339    use tempfile::TempDir;
340
341    #[test]
342    fn test_backup_config_default() {
343        let config = BackupConfig::default();
344        assert!(config.verify_after_backup);
345    }
346
347    #[test]
348    fn test_backup_type_serialization() {
349        let full = BackupType::Full;
350        let json = serde_json::to_string(&full).unwrap();
351        let deserialized: BackupType = serde_json::from_str(&json).unwrap();
352        assert_eq!(full, deserialized);
353    }
354}