1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
12use chrono::{DateTime, Utc};
13use flate2::{read::GzDecoder, write::GzEncoder, Compression};
14use serde::{Deserialize, Serialize};
15use std::fs::{self, File};
16use std::io::{Read, Write};
17use std::path::{Path, PathBuf};
18use uuid::Uuid;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct BackupMetadata {
23 pub backup_id: String,
24 pub created_at: DateTime<Utc>,
25 pub backup_type: BackupType,
26 pub event_count: u64,
27 pub size_bytes: u64,
28 pub checksum: String,
29 pub from_sequence: Option<u64>,
30 pub to_sequence: u64,
31 pub compressed: bool,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36pub enum BackupType {
37 Full,
38 Incremental { from_backup_id: String },
39}
40
41#[derive(Debug, Clone)]
43pub struct BackupConfig {
44 pub backup_dir: PathBuf,
45 pub compression_level: Compression,
46 pub verify_after_backup: bool,
47}
48
49impl Default for BackupConfig {
50 fn default() -> Self {
51 Self {
52 backup_dir: PathBuf::from("./backups"),
53 compression_level: Compression::default(),
54 verify_after_backup: true,
55 }
56 }
57}
58
59pub struct BackupManager {
61 config: BackupConfig,
62}
63
64impl BackupManager {
65 pub fn new(config: BackupConfig) -> Result<Self> {
66 fs::create_dir_all(&config.backup_dir).map_err(|e| {
68 AllSourceError::StorageError(format!("Failed to create backup dir: {}", e))
69 })?;
70
71 Ok(Self { config })
72 }
73
74 pub fn create_backup(&self, events: &[Event]) -> Result<BackupMetadata> {
76 let backup_id = format!("full_{}", Uuid::new_v4());
77 let timestamp = Utc::now();
78
79 tracing::info!("Creating backup: {}", backup_id);
80
81 let event_count = events.len() as u64;
82
83 if event_count == 0 {
84 return Err(AllSourceError::ValidationError(
85 "No events to backup".to_string(),
86 ));
87 }
88
89 let json_data = serde_json::to_string(&events)?;
91
92 let backup_path = self.get_backup_path(&backup_id);
94 let mut encoder = GzEncoder::new(
95 File::create(&backup_path).map_err(|e| {
96 AllSourceError::StorageError(format!("Failed to create backup file: {}", e))
97 })?,
98 self.config.compression_level,
99 );
100
101 encoder
102 .write_all(json_data.as_bytes())
103 .map_err(|e| AllSourceError::StorageError(format!("Failed to write backup: {}", e)))?;
104
105 encoder.finish().map_err(|e| {
106 AllSourceError::StorageError(format!("Failed to finish compression: {}", e))
107 })?;
108
109 let size_bytes = fs::metadata(&backup_path)
110 .map_err(|e| AllSourceError::StorageError(e.to_string()))?
111 .len();
112
113 let checksum = self.calculate_checksum(&backup_path)?;
114
115 let metadata = BackupMetadata {
116 backup_id: backup_id.clone(),
117 created_at: timestamp,
118 backup_type: BackupType::Full,
119 event_count,
120 size_bytes,
121 checksum,
122 from_sequence: None,
123 to_sequence: event_count,
124 compressed: true,
125 };
126
127 self.save_metadata(&metadata)?;
129
130 if self.config.verify_after_backup {
132 self.verify_backup(&metadata)?;
133 }
134
135 tracing::info!(
136 "Backup complete: {} events, {} bytes compressed",
137 event_count,
138 size_bytes
139 );
140
141 Ok(metadata)
142 }
143
144 pub fn restore_from_backup(&self, backup_id: &str) -> Result<Vec<Event>> {
146 tracing::info!("Restoring from backup: {}", backup_id);
147
148 let metadata = self.load_metadata(backup_id)?;
149
150 self.verify_backup(&metadata)?;
152
153 let backup_path = self.get_backup_path(backup_id);
154
155 let file = File::open(&backup_path)
157 .map_err(|e| AllSourceError::StorageError(format!("Failed to open backup: {}", e)))?;
158
159 let mut decoder = GzDecoder::new(file);
160 let mut json_data = String::new();
161 decoder.read_to_string(&mut json_data).map_err(|e| {
162 AllSourceError::StorageError(format!("Failed to decompress backup: {}", e))
163 })?;
164
165 let events: Vec<Event> = serde_json::from_str(&json_data)?;
167
168 if events.len() != metadata.event_count as usize {
169 return Err(AllSourceError::ValidationError(format!(
170 "Event count mismatch: expected {}, got {}",
171 metadata.event_count,
172 events.len()
173 )));
174 }
175
176 tracing::info!("Restored {} events from backup", events.len());
177
178 Ok(events)
179 }
180
181 pub fn verify_backup(&self, metadata: &BackupMetadata) -> Result<()> {
183 let backup_path = self.get_backup_path(&metadata.backup_id);
184
185 if !backup_path.exists() {
186 return Err(AllSourceError::ValidationError(
187 "Backup file not found".to_string(),
188 ));
189 }
190
191 let checksum = self.calculate_checksum(&backup_path)?;
192
193 if checksum != metadata.checksum {
194 return Err(AllSourceError::ValidationError(format!(
195 "Checksum mismatch: expected {}, got {}",
196 metadata.checksum, checksum
197 )));
198 }
199
200 Ok(())
201 }
202
203 pub fn list_backups(&self) -> Result<Vec<BackupMetadata>> {
205 let mut backups = Vec::new();
206
207 let entries = fs::read_dir(&self.config.backup_dir)
208 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
209
210 for entry in entries {
211 let entry = entry.map_err(|e| AllSourceError::StorageError(e.to_string()))?;
212 let path = entry.path();
213
214 if path.extension().and_then(|s| s.to_str()) == Some("json") {
215 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
216 if let Some(backup_id) = stem.strip_suffix("_metadata") {
217 if let Ok(metadata) = self.load_metadata(backup_id) {
218 backups.push(metadata);
219 }
220 }
221 }
222 }
223 }
224
225 backups.sort_by(|a, b| b.created_at.cmp(&a.created_at));
227
228 Ok(backups)
229 }
230
231 pub fn delete_backup(&self, backup_id: &str) -> Result<()> {
233 tracing::info!("Deleting backup: {}", backup_id);
234
235 let backup_path = self.get_backup_path(backup_id);
236 let metadata_path = self.get_metadata_path(backup_id);
237
238 if backup_path.exists() {
239 fs::remove_file(&backup_path)
240 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
241 }
242
243 if metadata_path.exists() {
244 fs::remove_file(&metadata_path)
245 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
246 }
247
248 Ok(())
249 }
250
251 pub fn cleanup_old_backups(&self, keep_count: usize) -> Result<usize> {
253 let mut backups = self.list_backups()?;
254
255 if backups.len() <= keep_count {
256 return Ok(0);
257 }
258
259 backups.sort_by(|a, b| b.created_at.cmp(&a.created_at));
261
262 let to_delete = backups.split_off(keep_count);
263 let delete_count = to_delete.len();
264
265 for backup in to_delete {
266 self.delete_backup(&backup.backup_id)?;
267 }
268
269 tracing::info!("Cleaned up {} old backups", delete_count);
270
271 Ok(delete_count)
272 }
273
274 fn get_backup_path(&self, backup_id: &str) -> PathBuf {
277 self.config
278 .backup_dir
279 .join(format!("{}.backup.gz", backup_id))
280 }
281
282 fn get_metadata_path(&self, backup_id: &str) -> PathBuf {
283 self.config
284 .backup_dir
285 .join(format!("{}_metadata.json", backup_id))
286 }
287
288 fn save_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
289 let path = self.get_metadata_path(&metadata.backup_id);
290 let json = serde_json::to_string_pretty(metadata)?;
291
292 fs::write(&path, json).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
293
294 Ok(())
295 }
296
297 fn load_metadata(&self, backup_id: &str) -> Result<BackupMetadata> {
298 let path = self.get_metadata_path(backup_id);
299
300 if !path.exists() {
301 return Err(AllSourceError::ValidationError(
302 "Backup metadata not found".to_string(),
303 ));
304 }
305
306 let json =
307 fs::read_to_string(&path).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
308
309 Ok(serde_json::from_str(&json)?)
310 }
311
312 fn calculate_checksum(&self, path: &Path) -> Result<String> {
313 use sha2::{Digest, Sha256};
314
315 let mut file = File::open(path).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
316
317 let mut hasher = Sha256::new();
318 let mut buffer = [0; 8192];
319
320 loop {
321 let count = file
322 .read(&mut buffer)
323 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
324
325 if count == 0 {
326 break;
327 }
328
329 hasher.update(&buffer[..count]);
330 }
331
332 Ok(format!("{:x}", hasher.finalize()))
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339 use tempfile::TempDir;
340
341 #[test]
342 fn test_backup_config_default() {
343 let config = BackupConfig::default();
344 assert!(config.verify_after_backup);
345 }
346
347 #[test]
348 fn test_backup_type_serialization() {
349 let full = BackupType::Full;
350 let json = serde_json::to_string(&full).unwrap();
351 let deserialized: BackupType = serde_json::from_str(&json).unwrap();
352 assert_eq!(full, deserialized);
353 }
354}