Skip to main content

allsource_core/infrastructure/persistence/
backup.rs

1use crate::domain::entities::Event;
2/// Backup and restore system for AllSource event store
3///
4/// Features:
5/// - Full backups of all event data
6/// - Incremental backups from checkpoint
7/// - Compressed backup files (gzip)
8/// - Metadata tracking
9/// - Verification and integrity checks
10/// - Support for filesystem and S3-compatible storage
11use crate::error::{AllSourceError, Result};
12use chrono::{DateTime, Utc};
13use flate2::{Compression, read::GzDecoder, write::GzEncoder};
14use serde::{Deserialize, Serialize};
15use std::{
16    fs::{self, File},
17    io::{Read, Write},
18    path::{Path, PathBuf},
19};
20use uuid::Uuid;
21
22/// Backup metadata
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct BackupMetadata {
25    pub backup_id: String,
26    pub created_at: DateTime<Utc>,
27    pub backup_type: BackupType,
28    pub event_count: u64,
29    pub size_bytes: u64,
30    pub checksum: String,
31    pub from_sequence: Option<u64>,
32    pub to_sequence: u64,
33    pub compressed: bool,
34}
35
36/// Type of backup
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub enum BackupType {
39    Full,
40    Incremental { from_backup_id: String },
41}
42
43/// Backup configuration
44#[derive(Debug, Clone)]
45pub struct BackupConfig {
46    pub backup_dir: PathBuf,
47    pub compression_level: Compression,
48    pub verify_after_backup: bool,
49}
50
51impl Default for BackupConfig {
52    fn default() -> Self {
53        Self {
54            backup_dir: PathBuf::from("./backups"),
55            compression_level: Compression::default(),
56            verify_after_backup: true,
57        }
58    }
59}
60
61/// Backup manager
62pub struct BackupManager {
63    config: BackupConfig,
64}
65
66/// Validate a backup identifier before it is joined into a filesystem path.
67///
68/// Backup IDs come from callers (CLI, embedded API, future HTTP handlers) and
69/// are concatenated into filenames under `backup_dir`. Without validation a
70/// caller could supply `../../etc/passwd` or similar and escape the backup
71/// directory. We accept only ASCII alphanumerics, `_`, `-`, and enforce a
72/// maximum length so the resulting path stays within the configured dir.
73fn validate_backup_id(id: &str) -> Result<()> {
74    if id.is_empty() {
75        return Err(AllSourceError::ValidationError(
76            "backup id must not be empty".to_string(),
77        ));
78    }
79    if id.len() > 128 {
80        return Err(AllSourceError::ValidationError(
81            "backup id too long (max 128 chars)".to_string(),
82        ));
83    }
84    if !id
85        .chars()
86        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
87    {
88        return Err(AllSourceError::ValidationError(format!(
89            "backup id '{id}' contains invalid characters; only [A-Za-z0-9_-] allowed"
90        )));
91    }
92    Ok(())
93}
94
95impl BackupManager {
96    pub fn new(config: BackupConfig) -> Result<Self> {
97        // Ensure backup directory exists
98        fs::create_dir_all(&config.backup_dir).map_err(|e| {
99            AllSourceError::StorageError(format!("Failed to create backup dir: {e}"))
100        })?;
101
102        Ok(Self { config })
103    }
104
105    /// Create a full backup from events
106    pub fn create_backup(&self, events: &[Event]) -> Result<BackupMetadata> {
107        let backup_id = format!("full_{}", Uuid::new_v4());
108        let timestamp = Utc::now();
109
110        tracing::info!("Creating backup: {}", backup_id);
111
112        let event_count = events.len() as u64;
113
114        if event_count == 0 {
115            return Err(AllSourceError::ValidationError(
116                "No events to backup".to_string(),
117            ));
118        }
119
120        // Serialize events to JSON
121        let json_data = serde_json::to_string(&events)?;
122
123        // Compress backup
124        let backup_path = self.get_backup_path(&backup_id);
125        let mut encoder = GzEncoder::new(
126            File::create(&backup_path).map_err(|e| {
127                AllSourceError::StorageError(format!("Failed to create backup file: {e}"))
128            })?,
129            self.config.compression_level,
130        );
131
132        encoder
133            .write_all(json_data.as_bytes())
134            .map_err(|e| AllSourceError::StorageError(format!("Failed to write backup: {e}")))?;
135
136        encoder.finish().map_err(|e| {
137            AllSourceError::StorageError(format!("Failed to finish compression: {e}"))
138        })?;
139
140        let size_bytes = fs::metadata(&backup_path)
141            .map_err(|e| AllSourceError::StorageError(e.to_string()))?
142            .len();
143
144        let checksum = self.calculate_checksum(&backup_path)?;
145
146        let metadata = BackupMetadata {
147            backup_id: backup_id.clone(),
148            created_at: timestamp,
149            backup_type: BackupType::Full,
150            event_count,
151            size_bytes,
152            checksum,
153            from_sequence: None,
154            to_sequence: event_count,
155            compressed: true,
156        };
157
158        // Save metadata
159        self.save_metadata(&metadata)?;
160
161        // Verify if configured
162        if self.config.verify_after_backup {
163            self.verify_backup(&metadata)?;
164        }
165
166        tracing::info!(
167            "Backup complete: {} events, {} bytes compressed",
168            event_count,
169            size_bytes
170        );
171
172        Ok(metadata)
173    }
174
175    /// Restore from backup
176    pub fn restore_from_backup(&self, backup_id: &str) -> Result<Vec<Event>> {
177        validate_backup_id(backup_id)?;
178        tracing::info!("Restoring from backup: {}", backup_id);
179
180        let metadata = self.load_metadata(backup_id)?;
181
182        // Verify backup integrity
183        self.verify_backup(&metadata)?;
184
185        let backup_path = self.get_backup_path(backup_id);
186
187        // Decompress backup
188        let file = File::open(&backup_path)
189            .map_err(|e| AllSourceError::StorageError(format!("Failed to open backup: {e}")))?;
190
191        let mut decoder = GzDecoder::new(file);
192        let mut json_data = String::new();
193        decoder.read_to_string(&mut json_data).map_err(|e| {
194            AllSourceError::StorageError(format!("Failed to decompress backup: {e}"))
195        })?;
196
197        // Deserialize events
198        let events: Vec<Event> = serde_json::from_str(&json_data)?;
199
200        if events.len() != metadata.event_count as usize {
201            return Err(AllSourceError::ValidationError(format!(
202                "Event count mismatch: expected {}, got {}",
203                metadata.event_count,
204                events.len()
205            )));
206        }
207
208        tracing::info!("Restored {} events from backup", events.len());
209
210        Ok(events)
211    }
212
213    /// Verify backup integrity
214    pub fn verify_backup(&self, metadata: &BackupMetadata) -> Result<()> {
215        let backup_path = self.get_backup_path(&metadata.backup_id);
216
217        if !backup_path.exists() {
218            return Err(AllSourceError::ValidationError(
219                "Backup file not found".to_string(),
220            ));
221        }
222
223        let checksum = self.calculate_checksum(&backup_path)?;
224
225        if checksum != metadata.checksum {
226            return Err(AllSourceError::ValidationError(format!(
227                "Checksum mismatch: expected {}, got {}",
228                metadata.checksum, checksum
229            )));
230        }
231
232        Ok(())
233    }
234
235    /// List all backups
236    pub fn list_backups(&self) -> Result<Vec<BackupMetadata>> {
237        let mut backups = Vec::new();
238
239        let entries = fs::read_dir(&self.config.backup_dir)
240            .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
241
242        for entry in entries {
243            let entry = entry.map_err(|e| AllSourceError::StorageError(e.to_string()))?;
244            let path = entry.path();
245
246            if path.extension().and_then(|s| s.to_str()) == Some("json")
247                && let Some(stem) = path.file_stem().and_then(|s| s.to_str())
248                && let Some(backup_id) = stem.strip_suffix("_metadata")
249                && let Ok(metadata) = self.load_metadata(backup_id)
250            {
251                backups.push(metadata);
252            }
253        }
254
255        // Sort by creation time, newest first
256        backups.sort_by_key(|x| std::cmp::Reverse(x.created_at));
257
258        Ok(backups)
259    }
260
261    /// Delete backup
262    pub fn delete_backup(&self, backup_id: &str) -> Result<()> {
263        validate_backup_id(backup_id)?;
264        tracing::info!("Deleting backup: {}", backup_id);
265
266        let backup_path = self.get_backup_path(backup_id);
267        let metadata_path = self.get_metadata_path(backup_id);
268
269        if backup_path.exists() {
270            fs::remove_file(&backup_path)
271                .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
272        }
273
274        if metadata_path.exists() {
275            fs::remove_file(&metadata_path)
276                .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
277        }
278
279        Ok(())
280    }
281
282    /// Cleanup old backups (keep last N)
283    pub fn cleanup_old_backups(&self, keep_count: usize) -> Result<usize> {
284        let mut backups = self.list_backups()?;
285
286        if backups.len() <= keep_count {
287            return Ok(0);
288        }
289
290        // Sort by date, oldest last
291        backups.sort_by_key(|x| std::cmp::Reverse(x.created_at));
292
293        let to_delete = backups.split_off(keep_count);
294        let delete_count = to_delete.len();
295
296        for backup in to_delete {
297            self.delete_backup(&backup.backup_id)?;
298        }
299
300        tracing::info!("Cleaned up {} old backups", delete_count);
301
302        Ok(delete_count)
303    }
304
305    // Private helper methods
306
307    fn get_backup_path(&self, backup_id: &str) -> PathBuf {
308        self.config
309            .backup_dir
310            .join(format!("{backup_id}.backup.gz"))
311    }
312
313    fn get_metadata_path(&self, backup_id: &str) -> PathBuf {
314        self.config
315            .backup_dir
316            .join(format!("{backup_id}_metadata.json"))
317    }
318
319    fn save_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
320        let path = self.get_metadata_path(&metadata.backup_id);
321        let json = serde_json::to_string_pretty(metadata)?;
322
323        fs::write(&path, json).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
324
325        Ok(())
326    }
327
328    fn load_metadata(&self, backup_id: &str) -> Result<BackupMetadata> {
329        validate_backup_id(backup_id)?;
330        let path = self.get_metadata_path(backup_id);
331
332        if !path.exists() {
333            return Err(AllSourceError::ValidationError(
334                "Backup metadata not found".to_string(),
335            ));
336        }
337
338        let json =
339            fs::read_to_string(&path).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
340
341        Ok(serde_json::from_str(&json)?)
342    }
343
344    fn calculate_checksum(&self, path: &Path) -> Result<String> {
345        use sha2::{Digest, Sha256};
346
347        let mut file = File::open(path).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
348
349        let mut hasher = Sha256::new();
350        let mut buffer = [0; 8192];
351
352        loop {
353            let count = file
354                .read(&mut buffer)
355                .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
356
357            if count == 0 {
358                break;
359            }
360
361            hasher.update(&buffer[..count]);
362        }
363
364        Ok(format!("{:x}", hasher.finalize()))
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371
372    #[test]
373    fn test_backup_config_default() {
374        let config = BackupConfig::default();
375        assert!(config.verify_after_backup);
376    }
377
378    #[test]
379    fn test_backup_type_serialization() {
380        let full = BackupType::Full;
381        let json = serde_json::to_string(&full).unwrap();
382        let deserialized: BackupType = serde_json::from_str(&json).unwrap();
383        assert_eq!(full, deserialized);
384    }
385
386    #[test]
387    fn test_validate_backup_id_accepts_valid() {
388        assert!(validate_backup_id("full_a1b2c3").is_ok());
389        assert!(validate_backup_id("incremental-2026-04-13").is_ok());
390        assert!(validate_backup_id("A").is_ok());
391        assert!(
392            validate_backup_id("full_550e8400-e29b-41d4-a716-446655440000").is_ok(),
393            "uuid-style ids with dashes should be accepted"
394        );
395    }
396
397    #[test]
398    fn test_validate_backup_id_rejects_traversal() {
399        assert!(validate_backup_id("").is_err());
400        assert!(validate_backup_id("../etc/passwd").is_err());
401        assert!(validate_backup_id("..").is_err());
402        assert!(validate_backup_id("foo/bar").is_err());
403        assert!(validate_backup_id("foo\\bar").is_err());
404        assert!(validate_backup_id("foo.bar").is_err(), "dots are rejected");
405        assert!(validate_backup_id("foo\0bar").is_err());
406        assert!(validate_backup_id("foo bar").is_err());
407    }
408
409    #[test]
410    fn test_validate_backup_id_length_limit() {
411        let too_long = "a".repeat(129);
412        assert!(validate_backup_id(&too_long).is_err());
413        let max_ok = "a".repeat(128);
414        assert!(validate_backup_id(&max_ok).is_ok());
415    }
416}