allsource_core/infrastructure/persistence/
backup.rs1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
12use chrono::{DateTime, Utc};
13use flate2::{Compression, read::GzDecoder, write::GzEncoder};
14use serde::{Deserialize, Serialize};
15use std::{
16 fs::{self, File},
17 io::{Read, Write},
18 path::{Path, PathBuf},
19};
20use uuid::Uuid;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct BackupMetadata {
25 pub backup_id: String,
26 pub created_at: DateTime<Utc>,
27 pub backup_type: BackupType,
28 pub event_count: u64,
29 pub size_bytes: u64,
30 pub checksum: String,
31 pub from_sequence: Option<u64>,
32 pub to_sequence: u64,
33 pub compressed: bool,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub enum BackupType {
39 Full,
40 Incremental { from_backup_id: String },
41}
42
43#[derive(Debug, Clone)]
45pub struct BackupConfig {
46 pub backup_dir: PathBuf,
47 pub compression_level: Compression,
48 pub verify_after_backup: bool,
49}
50
51impl Default for BackupConfig {
52 fn default() -> Self {
53 Self {
54 backup_dir: PathBuf::from("./backups"),
55 compression_level: Compression::default(),
56 verify_after_backup: true,
57 }
58 }
59}
60
61pub struct BackupManager {
63 config: BackupConfig,
64}
65
66fn validate_backup_id(id: &str) -> Result<()> {
74 if id.is_empty() {
75 return Err(AllSourceError::ValidationError(
76 "backup id must not be empty".to_string(),
77 ));
78 }
79 if id.len() > 128 {
80 return Err(AllSourceError::ValidationError(
81 "backup id too long (max 128 chars)".to_string(),
82 ));
83 }
84 if !id
85 .chars()
86 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
87 {
88 return Err(AllSourceError::ValidationError(format!(
89 "backup id '{id}' contains invalid characters; only [A-Za-z0-9_-] allowed"
90 )));
91 }
92 Ok(())
93}
94
95impl BackupManager {
96 pub fn new(config: BackupConfig) -> Result<Self> {
97 fs::create_dir_all(&config.backup_dir).map_err(|e| {
99 AllSourceError::StorageError(format!("Failed to create backup dir: {e}"))
100 })?;
101
102 Ok(Self { config })
103 }
104
105 pub fn create_backup(&self, events: &[Event]) -> Result<BackupMetadata> {
107 let backup_id = format!("full_{}", Uuid::new_v4());
108 let timestamp = Utc::now();
109
110 tracing::info!("Creating backup: {}", backup_id);
111
112 let event_count = events.len() as u64;
113
114 if event_count == 0 {
115 return Err(AllSourceError::ValidationError(
116 "No events to backup".to_string(),
117 ));
118 }
119
120 let json_data = serde_json::to_string(&events)?;
122
123 let backup_path = self.get_backup_path(&backup_id);
125 let mut encoder = GzEncoder::new(
126 File::create(&backup_path).map_err(|e| {
127 AllSourceError::StorageError(format!("Failed to create backup file: {e}"))
128 })?,
129 self.config.compression_level,
130 );
131
132 encoder
133 .write_all(json_data.as_bytes())
134 .map_err(|e| AllSourceError::StorageError(format!("Failed to write backup: {e}")))?;
135
136 encoder.finish().map_err(|e| {
137 AllSourceError::StorageError(format!("Failed to finish compression: {e}"))
138 })?;
139
140 let size_bytes = fs::metadata(&backup_path)
141 .map_err(|e| AllSourceError::StorageError(e.to_string()))?
142 .len();
143
144 let checksum = self.calculate_checksum(&backup_path)?;
145
146 let metadata = BackupMetadata {
147 backup_id: backup_id.clone(),
148 created_at: timestamp,
149 backup_type: BackupType::Full,
150 event_count,
151 size_bytes,
152 checksum,
153 from_sequence: None,
154 to_sequence: event_count,
155 compressed: true,
156 };
157
158 self.save_metadata(&metadata)?;
160
161 if self.config.verify_after_backup {
163 self.verify_backup(&metadata)?;
164 }
165
166 tracing::info!(
167 "Backup complete: {} events, {} bytes compressed",
168 event_count,
169 size_bytes
170 );
171
172 Ok(metadata)
173 }
174
175 pub fn restore_from_backup(&self, backup_id: &str) -> Result<Vec<Event>> {
177 validate_backup_id(backup_id)?;
178 tracing::info!("Restoring from backup: {}", backup_id);
179
180 let metadata = self.load_metadata(backup_id)?;
181
182 self.verify_backup(&metadata)?;
184
185 let backup_path = self.get_backup_path(backup_id);
186
187 let file = File::open(&backup_path)
189 .map_err(|e| AllSourceError::StorageError(format!("Failed to open backup: {e}")))?;
190
191 let mut decoder = GzDecoder::new(file);
192 let mut json_data = String::new();
193 decoder.read_to_string(&mut json_data).map_err(|e| {
194 AllSourceError::StorageError(format!("Failed to decompress backup: {e}"))
195 })?;
196
197 let events: Vec<Event> = serde_json::from_str(&json_data)?;
199
200 if events.len() != metadata.event_count as usize {
201 return Err(AllSourceError::ValidationError(format!(
202 "Event count mismatch: expected {}, got {}",
203 metadata.event_count,
204 events.len()
205 )));
206 }
207
208 tracing::info!("Restored {} events from backup", events.len());
209
210 Ok(events)
211 }
212
213 pub fn verify_backup(&self, metadata: &BackupMetadata) -> Result<()> {
215 let backup_path = self.get_backup_path(&metadata.backup_id);
216
217 if !backup_path.exists() {
218 return Err(AllSourceError::ValidationError(
219 "Backup file not found".to_string(),
220 ));
221 }
222
223 let checksum = self.calculate_checksum(&backup_path)?;
224
225 if checksum != metadata.checksum {
226 return Err(AllSourceError::ValidationError(format!(
227 "Checksum mismatch: expected {}, got {}",
228 metadata.checksum, checksum
229 )));
230 }
231
232 Ok(())
233 }
234
235 pub fn list_backups(&self) -> Result<Vec<BackupMetadata>> {
237 let mut backups = Vec::new();
238
239 let entries = fs::read_dir(&self.config.backup_dir)
240 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
241
242 for entry in entries {
243 let entry = entry.map_err(|e| AllSourceError::StorageError(e.to_string()))?;
244 let path = entry.path();
245
246 if path.extension().and_then(|s| s.to_str()) == Some("json")
247 && let Some(stem) = path.file_stem().and_then(|s| s.to_str())
248 && let Some(backup_id) = stem.strip_suffix("_metadata")
249 && let Ok(metadata) = self.load_metadata(backup_id)
250 {
251 backups.push(metadata);
252 }
253 }
254
255 backups.sort_by_key(|x| std::cmp::Reverse(x.created_at));
257
258 Ok(backups)
259 }
260
261 pub fn delete_backup(&self, backup_id: &str) -> Result<()> {
263 validate_backup_id(backup_id)?;
264 tracing::info!("Deleting backup: {}", backup_id);
265
266 let backup_path = self.get_backup_path(backup_id);
267 let metadata_path = self.get_metadata_path(backup_id);
268
269 if backup_path.exists() {
270 fs::remove_file(&backup_path)
271 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
272 }
273
274 if metadata_path.exists() {
275 fs::remove_file(&metadata_path)
276 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
277 }
278
279 Ok(())
280 }
281
282 pub fn cleanup_old_backups(&self, keep_count: usize) -> Result<usize> {
284 let mut backups = self.list_backups()?;
285
286 if backups.len() <= keep_count {
287 return Ok(0);
288 }
289
290 backups.sort_by_key(|x| std::cmp::Reverse(x.created_at));
292
293 let to_delete = backups.split_off(keep_count);
294 let delete_count = to_delete.len();
295
296 for backup in to_delete {
297 self.delete_backup(&backup.backup_id)?;
298 }
299
300 tracing::info!("Cleaned up {} old backups", delete_count);
301
302 Ok(delete_count)
303 }
304
305 fn get_backup_path(&self, backup_id: &str) -> PathBuf {
308 self.config
309 .backup_dir
310 .join(format!("{backup_id}.backup.gz"))
311 }
312
313 fn get_metadata_path(&self, backup_id: &str) -> PathBuf {
314 self.config
315 .backup_dir
316 .join(format!("{backup_id}_metadata.json"))
317 }
318
319 fn save_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
320 let path = self.get_metadata_path(&metadata.backup_id);
321 let json = serde_json::to_string_pretty(metadata)?;
322
323 fs::write(&path, json).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
324
325 Ok(())
326 }
327
328 fn load_metadata(&self, backup_id: &str) -> Result<BackupMetadata> {
329 validate_backup_id(backup_id)?;
330 let path = self.get_metadata_path(backup_id);
331
332 if !path.exists() {
333 return Err(AllSourceError::ValidationError(
334 "Backup metadata not found".to_string(),
335 ));
336 }
337
338 let json =
339 fs::read_to_string(&path).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
340
341 Ok(serde_json::from_str(&json)?)
342 }
343
344 fn calculate_checksum(&self, path: &Path) -> Result<String> {
345 use sha2::{Digest, Sha256};
346
347 let mut file = File::open(path).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
348
349 let mut hasher = Sha256::new();
350 let mut buffer = [0; 8192];
351
352 loop {
353 let count = file
354 .read(&mut buffer)
355 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
356
357 if count == 0 {
358 break;
359 }
360
361 hasher.update(&buffer[..count]);
362 }
363
364 Ok(format!("{:x}", hasher.finalize()))
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371
372 #[test]
373 fn test_backup_config_default() {
374 let config = BackupConfig::default();
375 assert!(config.verify_after_backup);
376 }
377
378 #[test]
379 fn test_backup_type_serialization() {
380 let full = BackupType::Full;
381 let json = serde_json::to_string(&full).unwrap();
382 let deserialized: BackupType = serde_json::from_str(&json).unwrap();
383 assert_eq!(full, deserialized);
384 }
385
386 #[test]
387 fn test_validate_backup_id_accepts_valid() {
388 assert!(validate_backup_id("full_a1b2c3").is_ok());
389 assert!(validate_backup_id("incremental-2026-04-13").is_ok());
390 assert!(validate_backup_id("A").is_ok());
391 assert!(
392 validate_backup_id("full_550e8400-e29b-41d4-a716-446655440000").is_ok(),
393 "uuid-style ids with dashes should be accepted"
394 );
395 }
396
397 #[test]
398 fn test_validate_backup_id_rejects_traversal() {
399 assert!(validate_backup_id("").is_err());
400 assert!(validate_backup_id("../etc/passwd").is_err());
401 assert!(validate_backup_id("..").is_err());
402 assert!(validate_backup_id("foo/bar").is_err());
403 assert!(validate_backup_id("foo\\bar").is_err());
404 assert!(validate_backup_id("foo.bar").is_err(), "dots are rejected");
405 assert!(validate_backup_id("foo\0bar").is_err());
406 assert!(validate_backup_id("foo bar").is_err());
407 }
408
409 #[test]
410 fn test_validate_backup_id_length_limit() {
411 let too_long = "a".repeat(129);
412 assert!(validate_backup_id(&too_long).is_err());
413 let max_ok = "a".repeat(128);
414 assert!(validate_backup_id(&max_ok).is_ok());
415 }
416}