allsource_core/infrastructure/persistence/
backup.rs1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
12use chrono::{DateTime, Utc};
13use flate2::{Compression, read::GzDecoder, write::GzEncoder};
14use serde::{Deserialize, Serialize};
15use std::{
16 fs::{self, File},
17 io::{Read, Write},
18 path::{Path, PathBuf},
19};
20use uuid::Uuid;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct BackupMetadata {
25 pub backup_id: String,
26 pub created_at: DateTime<Utc>,
27 pub backup_type: BackupType,
28 pub event_count: u64,
29 pub size_bytes: u64,
30 pub checksum: String,
31 pub from_sequence: Option<u64>,
32 pub to_sequence: u64,
33 pub compressed: bool,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub enum BackupType {
39 Full,
40 Incremental { from_backup_id: String },
41}
42
43#[derive(Debug, Clone)]
45pub struct BackupConfig {
46 pub backup_dir: PathBuf,
47 pub compression_level: Compression,
48 pub verify_after_backup: bool,
49}
50
51impl Default for BackupConfig {
52 fn default() -> Self {
53 Self {
54 backup_dir: PathBuf::from("./backups"),
55 compression_level: Compression::default(),
56 verify_after_backup: true,
57 }
58 }
59}
60
61pub struct BackupManager {
63 config: BackupConfig,
64}
65
66impl BackupManager {
67 pub fn new(config: BackupConfig) -> Result<Self> {
68 fs::create_dir_all(&config.backup_dir).map_err(|e| {
70 AllSourceError::StorageError(format!("Failed to create backup dir: {e}"))
71 })?;
72
73 Ok(Self { config })
74 }
75
76 pub fn create_backup(&self, events: &[Event]) -> Result<BackupMetadata> {
78 let backup_id = format!("full_{}", Uuid::new_v4());
79 let timestamp = Utc::now();
80
81 tracing::info!("Creating backup: {}", backup_id);
82
83 let event_count = events.len() as u64;
84
85 if event_count == 0 {
86 return Err(AllSourceError::ValidationError(
87 "No events to backup".to_string(),
88 ));
89 }
90
91 let json_data = serde_json::to_string(&events)?;
93
94 let backup_path = self.get_backup_path(&backup_id);
96 let mut encoder = GzEncoder::new(
97 File::create(&backup_path).map_err(|e| {
98 AllSourceError::StorageError(format!("Failed to create backup file: {e}"))
99 })?,
100 self.config.compression_level,
101 );
102
103 encoder
104 .write_all(json_data.as_bytes())
105 .map_err(|e| AllSourceError::StorageError(format!("Failed to write backup: {e}")))?;
106
107 encoder.finish().map_err(|e| {
108 AllSourceError::StorageError(format!("Failed to finish compression: {e}"))
109 })?;
110
111 let size_bytes = fs::metadata(&backup_path)
112 .map_err(|e| AllSourceError::StorageError(e.to_string()))?
113 .len();
114
115 let checksum = self.calculate_checksum(&backup_path)?;
116
117 let metadata = BackupMetadata {
118 backup_id: backup_id.clone(),
119 created_at: timestamp,
120 backup_type: BackupType::Full,
121 event_count,
122 size_bytes,
123 checksum,
124 from_sequence: None,
125 to_sequence: event_count,
126 compressed: true,
127 };
128
129 self.save_metadata(&metadata)?;
131
132 if self.config.verify_after_backup {
134 self.verify_backup(&metadata)?;
135 }
136
137 tracing::info!(
138 "Backup complete: {} events, {} bytes compressed",
139 event_count,
140 size_bytes
141 );
142
143 Ok(metadata)
144 }
145
146 pub fn restore_from_backup(&self, backup_id: &str) -> Result<Vec<Event>> {
148 tracing::info!("Restoring from backup: {}", backup_id);
149
150 let metadata = self.load_metadata(backup_id)?;
151
152 self.verify_backup(&metadata)?;
154
155 let backup_path = self.get_backup_path(backup_id);
156
157 let file = File::open(&backup_path)
159 .map_err(|e| AllSourceError::StorageError(format!("Failed to open backup: {e}")))?;
160
161 let mut decoder = GzDecoder::new(file);
162 let mut json_data = String::new();
163 decoder.read_to_string(&mut json_data).map_err(|e| {
164 AllSourceError::StorageError(format!("Failed to decompress backup: {e}"))
165 })?;
166
167 let events: Vec<Event> = serde_json::from_str(&json_data)?;
169
170 if events.len() != metadata.event_count as usize {
171 return Err(AllSourceError::ValidationError(format!(
172 "Event count mismatch: expected {}, got {}",
173 metadata.event_count,
174 events.len()
175 )));
176 }
177
178 tracing::info!("Restored {} events from backup", events.len());
179
180 Ok(events)
181 }
182
183 pub fn verify_backup(&self, metadata: &BackupMetadata) -> Result<()> {
185 let backup_path = self.get_backup_path(&metadata.backup_id);
186
187 if !backup_path.exists() {
188 return Err(AllSourceError::ValidationError(
189 "Backup file not found".to_string(),
190 ));
191 }
192
193 let checksum = self.calculate_checksum(&backup_path)?;
194
195 if checksum != metadata.checksum {
196 return Err(AllSourceError::ValidationError(format!(
197 "Checksum mismatch: expected {}, got {}",
198 metadata.checksum, checksum
199 )));
200 }
201
202 Ok(())
203 }
204
205 pub fn list_backups(&self) -> Result<Vec<BackupMetadata>> {
207 let mut backups = Vec::new();
208
209 let entries = fs::read_dir(&self.config.backup_dir)
210 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
211
212 for entry in entries {
213 let entry = entry.map_err(|e| AllSourceError::StorageError(e.to_string()))?;
214 let path = entry.path();
215
216 if path.extension().and_then(|s| s.to_str()) == Some("json")
217 && let Some(stem) = path.file_stem().and_then(|s| s.to_str())
218 && let Some(backup_id) = stem.strip_suffix("_metadata")
219 && let Ok(metadata) = self.load_metadata(backup_id)
220 {
221 backups.push(metadata);
222 }
223 }
224
225 backups.sort_by_key(|x| std::cmp::Reverse(x.created_at));
227
228 Ok(backups)
229 }
230
231 pub fn delete_backup(&self, backup_id: &str) -> Result<()> {
233 tracing::info!("Deleting backup: {}", backup_id);
234
235 let backup_path = self.get_backup_path(backup_id);
236 let metadata_path = self.get_metadata_path(backup_id);
237
238 if backup_path.exists() {
239 fs::remove_file(&backup_path)
240 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
241 }
242
243 if metadata_path.exists() {
244 fs::remove_file(&metadata_path)
245 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
246 }
247
248 Ok(())
249 }
250
251 pub fn cleanup_old_backups(&self, keep_count: usize) -> Result<usize> {
253 let mut backups = self.list_backups()?;
254
255 if backups.len() <= keep_count {
256 return Ok(0);
257 }
258
259 backups.sort_by_key(|x| std::cmp::Reverse(x.created_at));
261
262 let to_delete = backups.split_off(keep_count);
263 let delete_count = to_delete.len();
264
265 for backup in to_delete {
266 self.delete_backup(&backup.backup_id)?;
267 }
268
269 tracing::info!("Cleaned up {} old backups", delete_count);
270
271 Ok(delete_count)
272 }
273
274 fn get_backup_path(&self, backup_id: &str) -> PathBuf {
277 self.config
278 .backup_dir
279 .join(format!("{backup_id}.backup.gz"))
280 }
281
282 fn get_metadata_path(&self, backup_id: &str) -> PathBuf {
283 self.config
284 .backup_dir
285 .join(format!("{backup_id}_metadata.json"))
286 }
287
288 fn save_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
289 let path = self.get_metadata_path(&metadata.backup_id);
290 let json = serde_json::to_string_pretty(metadata)?;
291
292 fs::write(&path, json).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
293
294 Ok(())
295 }
296
297 fn load_metadata(&self, backup_id: &str) -> Result<BackupMetadata> {
298 let path = self.get_metadata_path(backup_id);
299
300 if !path.exists() {
301 return Err(AllSourceError::ValidationError(
302 "Backup metadata not found".to_string(),
303 ));
304 }
305
306 let json =
307 fs::read_to_string(&path).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
308
309 Ok(serde_json::from_str(&json)?)
310 }
311
312 fn calculate_checksum(&self, path: &Path) -> Result<String> {
313 use sha2::{Digest, Sha256};
314
315 let mut file = File::open(path).map_err(|e| AllSourceError::StorageError(e.to_string()))?;
316
317 let mut hasher = Sha256::new();
318 let mut buffer = [0; 8192];
319
320 loop {
321 let count = file
322 .read(&mut buffer)
323 .map_err(|e| AllSourceError::StorageError(e.to_string()))?;
324
325 if count == 0 {
326 break;
327 }
328
329 hasher.update(&buffer[..count]);
330 }
331
332 Ok(format!("{:x}", hasher.finalize()))
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339
340 #[test]
341 fn test_backup_config_default() {
342 let config = BackupConfig::default();
343 assert!(config.verify_after_backup);
344 }
345
346 #[test]
347 fn test_backup_type_serialization() {
348 let full = BackupType::Full;
349 let json = serde_json::to_string(&full).unwrap();
350 let deserialized: BackupType = serde_json::from_str(&json).unwrap();
351 assert_eq!(full, deserialized);
352 }
353}