allsource_core/
backup.rs

1/// Backup and restore system for AllSource event store
2///
3/// Features:
4/// - Full backups of all event data
5/// - Incremental backups from checkpoint
6/// - Compressed backup files (gzip)
7/// - Metadata tracking
8/// - Verification and integrity checks
9/// - Support for filesystem and S3-compatible storage
10
11use crate::error::{AllSourceError, Result};
12use crate::domain::entities::Event;
13use chrono::{DateTime, Utc};
14use flate2::{write::GzEncoder, read::GzDecoder, Compression};
15use serde::{Deserialize, Serialize};
16use std::fs::{self, File};
17use std::io::{Read, Write};
18use std::path::{Path, PathBuf};
19use uuid::Uuid;
20
21/// Backup metadata
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct BackupMetadata {
24    pub backup_id: String,
25    pub created_at: DateTime<Utc>,
26    pub backup_type: BackupType,
27    pub event_count: u64,
28    pub size_bytes: u64,
29    pub checksum: String,
30    pub from_sequence: Option<u64>,
31    pub to_sequence: u64,
32    pub compressed: bool,
33}
34
35/// Type of backup
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
37pub enum BackupType {
38    Full,
39    Incremental { from_backup_id: String },
40}
41
42/// Backup configuration
43#[derive(Debug, Clone)]
44pub struct BackupConfig {
45    pub backup_dir: PathBuf,
46    pub compression_level: Compression,
47    pub verify_after_backup: bool,
48}
49
50impl Default for BackupConfig {
51    fn default() -> Self {
52        Self {
53            backup_dir: PathBuf::from("./backups"),
54            compression_level: Compression::default(),
55            verify_after_backup: true,
56        }
57    }
58}
59
60/// Backup manager
61pub struct BackupManager {
62    config: BackupConfig,
63}
64
65impl BackupManager {
66    pub fn new(config: BackupConfig) -> Result<Self> {
67        // Ensure backup directory exists
68        fs::create_dir_all(&config.backup_dir)
69            .map_err(|e| AllSourceError::StorageError(format!("Failed to create backup dir: {}", e)))?;
70
71        Ok(Self { config })
72    }
73
74    /// Create a full backup from events
75    pub fn create_backup(&self, events: &[Event]) -> Result<BackupMetadata> {
76        let backup_id = format!("full_{}", Uuid::new_v4());
77        let timestamp = Utc::now();
78
79        tracing::info!("Creating backup: {}", backup_id);
80
81        let event_count = events.len() as u64;
82
83        if event_count == 0 {
84            return Err(AllSourceError::ValidationError(
85                "No events to backup".to_string(),
86            ));
87        }
88
89        // Serialize events to JSON
90        let json_data = serde_json::to_string(&events)?;
91
92        // Compress backup
93        let backup_path = self.get_backup_path(&backup_id);
94        let mut encoder = GzEncoder::new(
95            File::create(&backup_path)
96                .map_err(|e| AllSourceError::StorageError(format!("Failed to create backup file: {}", e)))?,
97            self.config.compression_level,
98        );
99
100        encoder
101            .write_all(json_data.as_bytes())
102            .map_err(|e| AllSourceError::StorageError(format!("Failed to write backup: {}", e)))?;
103
104        encoder
105            .finish()
106            .map_err(|e| AllSourceError::StorageError(format!("Failed to finish compression: {}", e)))?;
107
108        let size_bytes = fs::metadata(&backup_path)
109            .map_err(|e| AllSourceError::StorageError(e.to_string()))?
110            .len();
111
112        let checksum = self.calculate_checksum(&backup_path)?;
113
114        let metadata = BackupMetadata {
115            backup_id: backup_id.clone(),
116            created_at: timestamp,
117            backup_type: BackupType::Full,
118            event_count,
119            size_bytes,
120            checksum,
121            from_sequence: None,
122            to_sequence: event_count,
123            compressed: true,
124        };
125
126        // Save metadata
127        self.save_metadata(&metadata)?;
128
129        // Verify if configured
130        if self.config.verify_after_backup {
131            self.verify_backup(&metadata)?;
132        }
133
134        tracing::info!(
135            "Backup complete: {} events, {} bytes compressed",
136            event_count,
137            size_bytes
138        );
139
140        Ok(metadata)
141    }
142
143    /// Restore from backup
144    pub fn restore_from_backup(&self, backup_id: &str) -> Result<Vec<Event>> {
145        tracing::info!("Restoring from backup: {}", backup_id);
146
147        let metadata = self.load_metadata(backup_id)?;
148
149        // Verify backup integrity
150        self.verify_backup(&metadata)?;
151
152        let backup_path = self.get_backup_path(backup_id);
153
154        // Decompress backup
155        let file = File::open(&backup_path)
156            .map_err(|e| AllSourceError::StorageError(format!("Failed to open backup: {}", e)))?;
157
158        let mut decoder = GzDecoder::new(file);
159        let mut json_data = String::new();
160        decoder
161            .read_to_string(&mut json_data)
162            .map_err(|e| AllSourceError::StorageError(format!("Failed to decompress backup: {}", e)))?;
163
164        // Deserialize events
165        let events: Vec<Event> = serde_json::from_str(&json_data)?;
166
167        if events.len() != metadata.event_count as usize {
168            return Err(AllSourceError::ValidationError(format!(
169                "Event count mismatch: expected {}, got {}",
170                metadata.event_count,
171                events.len()
172            )));
173        }
174
175        tracing::info!("Restored {} events from backup", events.len());
176
177        Ok(events)
178    }
179
180    /// Verify backup integrity
181    pub fn verify_backup(&self, metadata: &BackupMetadata) -> Result<()> {
182        let backup_path = self.get_backup_path(&metadata.backup_id);
183
184        if !backup_path.exists() {
185            return Err(AllSourceError::ValidationError(
186                "Backup file not found".to_string(),
187            ));
188        }
189
190        let checksum = self.calculate_checksum(&backup_path)?;
191
192        if checksum != metadata.checksum {
193            return Err(AllSourceError::ValidationError(format!(
194                "Checksum mismatch: expected {}, got {}",
195                metadata.checksum, checksum
196            )));
197        }
198
199        Ok(())
200    }
201
202    /// List all backups
203    pub fn list_backups(&self) -> Result<Vec<BackupMetadata>> {
204        let mut backups = Vec::new();
205
206        let entries = fs::read_dir(&self.config.backup_dir)
207            .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
208
209        for entry in entries {
210            let entry = entry.map_err(|e| AllSourceError::StorageError(e.to_string()))?;
211            let path = entry.path();
212
213            if path.extension().and_then(|s| s.to_str()) == Some("json") {
214                if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
215                    if let Some(backup_id) = stem.strip_suffix("_metadata") {
216                        if let Ok(metadata) = self.load_metadata(backup_id) {
217                            backups.push(metadata);
218                        }
219                    }
220                }
221            }
222        }
223
224        // Sort by creation time, newest first
225        backups.sort_by(|a, b| b.created_at.cmp(&a.created_at));
226
227        Ok(backups)
228    }
229
230    /// Delete backup
231    pub fn delete_backup(&self, backup_id: &str) -> Result<()> {
232        tracing::info!("Deleting backup: {}", backup_id);
233
234        let backup_path = self.get_backup_path(backup_id);
235        let metadata_path = self.get_metadata_path(backup_id);
236
237        if backup_path.exists() {
238            fs::remove_file(&backup_path)
239                .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
240        }
241
242        if metadata_path.exists() {
243            fs::remove_file(&metadata_path)
244                .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
245        }
246
247        Ok(())
248    }
249
250    /// Cleanup old backups (keep last N)
251    pub fn cleanup_old_backups(&self, keep_count: usize) -> Result<usize> {
252        let mut backups = self.list_backups()?;
253
254        if backups.len() <= keep_count {
255            return Ok(0);
256        }
257
258        // Sort by date, oldest last
259        backups.sort_by(|a, b| b.created_at.cmp(&a.created_at));
260
261        let to_delete = backups.split_off(keep_count);
262        let delete_count = to_delete.len();
263
264        for backup in to_delete {
265            self.delete_backup(&backup.backup_id)?;
266        }
267
268        tracing::info!("Cleaned up {} old backups", delete_count);
269
270        Ok(delete_count)
271    }
272
273    // Private helper methods
274
275    fn get_backup_path(&self, backup_id: &str) -> PathBuf {
276        self.config
277            .backup_dir
278            .join(format!("{}.backup.gz", backup_id))
279    }
280
281    fn get_metadata_path(&self, backup_id: &str) -> PathBuf {
282        self.config
283            .backup_dir
284            .join(format!("{}_metadata.json", backup_id))
285    }
286
287    fn save_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
288        let path = self.get_metadata_path(&metadata.backup_id);
289        let json = serde_json::to_string_pretty(metadata)?;
290
291        fs::write(&path, json)
292            .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
293
294        Ok(())
295    }
296
297    fn load_metadata(&self, backup_id: &str) -> Result<BackupMetadata> {
298        let path = self.get_metadata_path(backup_id);
299
300        if !path.exists() {
301            return Err(AllSourceError::ValidationError(
302                "Backup metadata not found".to_string(),
303            ));
304        }
305
306        let json = fs::read_to_string(&path)
307            .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
308
309        Ok(serde_json::from_str(&json)?)
310    }
311
312    fn calculate_checksum(&self, path: &Path) -> Result<String> {
313        use sha2::{Sha256, Digest};
314
315        let mut file = File::open(path)
316            .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
317
318        let mut hasher = Sha256::new();
319        let mut buffer = [0; 8192];
320
321        loop {
322            let count = file
323                .read(&mut buffer)
324                .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
325
326            if count == 0 {
327                break;
328            }
329
330            hasher.update(&buffer[..count]);
331        }
332
333        Ok(format!("{:x}", hasher.finalize()))
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use tempfile::TempDir;
341
342    #[test]
343    fn test_backup_config_default() {
344        let config = BackupConfig::default();
345        assert!(config.verify_after_backup);
346    }
347
348    #[test]
349    fn test_backup_type_serialization() {
350        let full = BackupType::Full;
351        let json = serde_json::to_string(&full).unwrap();
352        let deserialized: BackupType = serde_json::from_str(&json).unwrap();
353        assert_eq!(full, deserialized);
354    }
355}