1use crate::error::{AllSourceError, Result};
12use crate::domain::entities::Event;
13use chrono::{DateTime, Utc};
14use flate2::{write::GzEncoder, read::GzDecoder, Compression};
15use serde::{Deserialize, Serialize};
16use std::fs::{self, File};
17use std::io::{Read, Write};
18use std::path::{Path, PathBuf};
19use uuid::Uuid;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct BackupMetadata {
24 pub backup_id: String,
25 pub created_at: DateTime<Utc>,
26 pub backup_type: BackupType,
27 pub event_count: u64,
28 pub size_bytes: u64,
29 pub checksum: String,
30 pub from_sequence: Option<u64>,
31 pub to_sequence: u64,
32 pub compressed: bool,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
37pub enum BackupType {
38 Full,
39 Incremental { from_backup_id: String },
40}
41
42#[derive(Debug, Clone)]
44pub struct BackupConfig {
45 pub backup_dir: PathBuf,
46 pub compression_level: Compression,
47 pub verify_after_backup: bool,
48}
49
50impl Default for BackupConfig {
51 fn default() -> Self {
52 Self {
53 backup_dir: PathBuf::from("./backups"),
54 compression_level: Compression::default(),
55 verify_after_backup: true,
56 }
57 }
58}
59
60pub struct BackupManager {
62 config: BackupConfig,
63}
64
65impl BackupManager {
66 pub fn new(config: BackupConfig) -> Result<Self> {
67 fs::create_dir_all(&config.backup_dir)
69 .map_err(|e| AllSourceError::StorageError(format!("Failed to create backup dir: {}", e)))?;
70
71 Ok(Self { config })
72 }
73
74 pub fn create_backup(&self, events: &[Event]) -> Result<BackupMetadata> {
76 let backup_id = format!("full_{}", Uuid::new_v4());
77 let timestamp = Utc::now();
78
79 tracing::info!("Creating backup: {}", backup_id);
80
81 let event_count = events.len() as u64;
82
83 if event_count == 0 {
84 return Err(AllSourceError::ValidationError(
85 "No events to backup".to_string(),
86 ));
87 }
88
89 let json_data = serde_json::to_string(&events)?;
91
92 let backup_path = self.get_backup_path(&backup_id);
94 let mut encoder = GzEncoder::new(
95 File::create(&backup_path)
96 .map_err(|e| AllSourceError::StorageError(format!("Failed to create backup file: {}", e)))?,
97 self.config.compression_level,
98 );
99
100 encoder
101 .write_all(json_data.as_bytes())
102 .map_err(|e| AllSourceError::StorageError(format!("Failed to write backup: {}", e)))?;
103
104 encoder
105 .finish()
106 .map_err(|e| AllSourceError::StorageError(format!("Failed to finish compression: {}", e)))?;
107
108 let size_bytes = fs::metadata(&backup_path)
109 .map_err(|e| AllSourceError::StorageError(e.to_string()))?
110 .len();
111
112 let checksum = self.calculate_checksum(&backup_path)?;
113
114 let metadata = BackupMetadata {
115 backup_id: backup_id.clone(),
116 created_at: timestamp,
117 backup_type: BackupType::Full,
118 event_count,
119 size_bytes,
120 checksum,
121 from_sequence: None,
122 to_sequence: event_count,
123 compressed: true,
124 };
125
126 self.save_metadata(&metadata)?;
128
129 if self.config.verify_after_backup {
131 self.verify_backup(&metadata)?;
132 }
133
134 tracing::info!(
135 "Backup complete: {} events, {} bytes compressed",
136 event_count,
137 size_bytes
138 );
139
140 Ok(metadata)
141 }
142
143 pub fn restore_from_backup(&self, backup_id: &str) -> Result<Vec<Event>> {
145 tracing::info!("Restoring from backup: {}", backup_id);
146
147 let metadata = self.load_metadata(backup_id)?;
148
149 self.verify_backup(&metadata)?;
151
152 let backup_path = self.get_backup_path(backup_id);
153
154 let file = File::open(&backup_path)
156 .map_err(|e| AllSourceError::StorageError(format!("Failed to open backup: {}", e)))?;
157
158 let mut decoder = GzDecoder::new(file);
159 let mut json_data = String::new();
160 decoder
161 .read_to_string(&mut json_data)
162 .map_err(|e| AllSourceError::StorageError(format!("Failed to decompress backup: {}", e)))?;
163
164 let events: Vec<Event> = serde_json::from_str(&json_data)?;
166
167 if events.len() != metadata.event_count as usize {
168 return Err(AllSourceError::ValidationError(format!(
169 "Event count mismatch: expected {}, got {}",
170 metadata.event_count,
171 events.len()
172 )));
173 }
174
175 tracing::info!("Restored {} events from backup", events.len());
176
177 Ok(events)
178 }
179
180 pub fn verify_backup(&self, metadata: &BackupMetadata) -> Result<()> {
182 let backup_path = self.get_backup_path(&metadata.backup_id);
183
184 if !backup_path.exists() {
185 return Err(AllSourceError::ValidationError(
186 "Backup file not found".to_string(),
187 ));
188 }
189
190 let checksum = self.calculate_checksum(&backup_path)?;
191
192 if checksum != metadata.checksum {
193 return Err(AllSourceError::ValidationError(format!(
194 "Checksum mismatch: expected {}, got {}",
195 metadata.checksum, checksum
196 )));
197 }
198
199 Ok(())
200 }
201
202 pub fn list_backups(&self) -> Result<Vec<BackupMetadata>> {
204 let mut backups = Vec::new();
205
206 let entries = fs::read_dir(&self.config.backup_dir)
207 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
208
209 for entry in entries {
210 let entry = entry.map_err(|e| AllSourceError::StorageError(e.to_string()))?;
211 let path = entry.path();
212
213 if path.extension().and_then(|s| s.to_str()) == Some("json") {
214 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
215 if let Some(backup_id) = stem.strip_suffix("_metadata") {
216 if let Ok(metadata) = self.load_metadata(backup_id) {
217 backups.push(metadata);
218 }
219 }
220 }
221 }
222 }
223
224 backups.sort_by(|a, b| b.created_at.cmp(&a.created_at));
226
227 Ok(backups)
228 }
229
230 pub fn delete_backup(&self, backup_id: &str) -> Result<()> {
232 tracing::info!("Deleting backup: {}", backup_id);
233
234 let backup_path = self.get_backup_path(backup_id);
235 let metadata_path = self.get_metadata_path(backup_id);
236
237 if backup_path.exists() {
238 fs::remove_file(&backup_path)
239 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
240 }
241
242 if metadata_path.exists() {
243 fs::remove_file(&metadata_path)
244 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
245 }
246
247 Ok(())
248 }
249
250 pub fn cleanup_old_backups(&self, keep_count: usize) -> Result<usize> {
252 let mut backups = self.list_backups()?;
253
254 if backups.len() <= keep_count {
255 return Ok(0);
256 }
257
258 backups.sort_by(|a, b| b.created_at.cmp(&a.created_at));
260
261 let to_delete = backups.split_off(keep_count);
262 let delete_count = to_delete.len();
263
264 for backup in to_delete {
265 self.delete_backup(&backup.backup_id)?;
266 }
267
268 tracing::info!("Cleaned up {} old backups", delete_count);
269
270 Ok(delete_count)
271 }
272
273 fn get_backup_path(&self, backup_id: &str) -> PathBuf {
276 self.config
277 .backup_dir
278 .join(format!("{}.backup.gz", backup_id))
279 }
280
281 fn get_metadata_path(&self, backup_id: &str) -> PathBuf {
282 self.config
283 .backup_dir
284 .join(format!("{}_metadata.json", backup_id))
285 }
286
287 fn save_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
288 let path = self.get_metadata_path(&metadata.backup_id);
289 let json = serde_json::to_string_pretty(metadata)?;
290
291 fs::write(&path, json)
292 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
293
294 Ok(())
295 }
296
297 fn load_metadata(&self, backup_id: &str) -> Result<BackupMetadata> {
298 let path = self.get_metadata_path(backup_id);
299
300 if !path.exists() {
301 return Err(AllSourceError::ValidationError(
302 "Backup metadata not found".to_string(),
303 ));
304 }
305
306 let json = fs::read_to_string(&path)
307 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
308
309 Ok(serde_json::from_str(&json)?)
310 }
311
312 fn calculate_checksum(&self, path: &Path) -> Result<String> {
313 use sha2::{Sha256, Digest};
314
315 let mut file = File::open(path)
316 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
317
318 let mut hasher = Sha256::new();
319 let mut buffer = [0; 8192];
320
321 loop {
322 let count = file
323 .read(&mut buffer)
324 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
325
326 if count == 0 {
327 break;
328 }
329
330 hasher.update(&buffer[..count]);
331 }
332
333 Ok(format!("{:x}", hasher.finalize()))
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use tempfile::TempDir;
341
342 #[test]
343 fn test_backup_config_default() {
344 let config = BackupConfig::default();
345 assert!(config.verify_after_backup);
346 }
347
348 #[test]
349 fn test_backup_type_serialization() {
350 let full = BackupType::Full;
351 let json = serde_json::to_string(&full).unwrap();
352 let deserialized: BackupType = serde_json::from_str(&json).unwrap();
353 assert_eq!(full, deserialized);
354 }
355}