1use crate::error::{AllSourceError, Result};
2use crate::domain::entities::Event;
3use chrono::{DateTime, Utc};
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, BufWriter, Write};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11pub struct WriteAheadLog {
13 wal_dir: PathBuf,
15
16 current_file: Arc<RwLock<WALFile>>,
18
19 config: WALConfig,
21
22 stats: Arc<RwLock<WALStats>>,
24
25 sequence: Arc<RwLock<u64>>,
27}
28
29#[derive(Debug, Clone)]
30pub struct WALConfig {
31 pub max_file_size: usize,
33
34 pub sync_on_write: bool,
36
37 pub max_wal_files: usize,
39
40 pub compress: bool,
42}
43
44impl Default for WALConfig {
45 fn default() -> Self {
46 Self {
47 max_file_size: 64 * 1024 * 1024, sync_on_write: true,
49 max_wal_files: 10,
50 compress: false,
51 }
52 }
53}
54
55#[derive(Debug, Clone, Default, Serialize)]
56pub struct WALStats {
57 pub total_entries: u64,
58 pub total_bytes_written: u64,
59 pub current_file_size: usize,
60 pub files_rotated: u64,
61 pub files_cleaned: u64,
62 pub recovery_count: u64,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct WALEntry {
68 pub sequence: u64,
70
71 pub wal_timestamp: DateTime<Utc>,
73
74 pub event: Event,
76
77 pub checksum: u32,
79}
80
81impl WALEntry {
82 pub fn new(sequence: u64, event: Event) -> Self {
83 let mut entry = Self {
84 sequence,
85 wal_timestamp: Utc::now(),
86 event,
87 checksum: 0,
88 };
89 entry.checksum = entry.calculate_checksum();
90 entry
91 }
92
93 fn calculate_checksum(&self) -> u32 {
94 let data = format!("{}{}{}", self.sequence, self.wal_timestamp, self.event.id);
96 crc32fast::hash(data.as_bytes())
97 }
98
99 pub fn verify(&self) -> bool {
100 self.checksum == self.calculate_checksum()
101 }
102}
103
104struct WALFile {
106 path: PathBuf,
107 writer: BufWriter<File>,
108 size: usize,
109 created_at: DateTime<Utc>,
110}
111
112impl WALFile {
113 fn new(path: PathBuf) -> Result<Self> {
114 let file = OpenOptions::new()
115 .create(true)
116 .append(true)
117 .open(&path)
118 .map_err(|e| AllSourceError::StorageError(format!("Failed to open WAL file: {}", e)))?;
119
120 let size = file
121 .metadata()
122 .map(|m| m.len() as usize)
123 .unwrap_or(0);
124
125 Ok(Self {
126 path,
127 writer: BufWriter::new(file),
128 size,
129 created_at: Utc::now(),
130 })
131 }
132
133 fn write_entry(&mut self, entry: &WALEntry, sync: bool) -> Result<usize> {
134 let json = serde_json::to_string(entry)?;
136
137 let line = format!("{}\n", json);
138 let bytes_written = line.len();
139
140 self.writer
141 .write_all(line.as_bytes())
142 .map_err(|e| AllSourceError::StorageError(format!("Failed to write to WAL: {}", e)))?;
143
144 if sync {
145 self.writer
146 .flush()
147 .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {}", e)))?;
148
149 self.writer
150 .get_ref()
151 .sync_all()
152 .map_err(|e| AllSourceError::StorageError(format!("Failed to sync WAL: {}", e)))?;
153 }
154
155 self.size += bytes_written;
156 Ok(bytes_written)
157 }
158
159 fn flush(&mut self) -> Result<()> {
160 self.writer
161 .flush()
162 .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {}", e)))?;
163 Ok(())
164 }
165}
166
167impl WriteAheadLog {
168 pub fn new(wal_dir: impl Into<PathBuf>, config: WALConfig) -> Result<Self> {
170 let wal_dir = wal_dir.into();
171
172 fs::create_dir_all(&wal_dir)
174 .map_err(|e| AllSourceError::StorageError(format!("Failed to create WAL directory: {}", e)))?;
175
176 let initial_file_path = Self::generate_wal_filename(&wal_dir, 0);
178 let current_file = WALFile::new(initial_file_path)?;
179
180 tracing::info!("โ
WAL initialized at: {}", wal_dir.display());
181
182 Ok(Self {
183 wal_dir,
184 current_file: Arc::new(RwLock::new(current_file)),
185 config,
186 stats: Arc::new(RwLock::new(WALStats::default())),
187 sequence: Arc::new(RwLock::new(0)),
188 })
189 }
190
191 fn generate_wal_filename(dir: &Path, sequence: u64) -> PathBuf {
193 dir.join(format!("wal-{:016x}.log", sequence))
194 }
195
196 pub fn append(&self, event: Event) -> Result<u64> {
198 let mut seq = self.sequence.write();
200 *seq += 1;
201 let sequence = *seq;
202 drop(seq);
203
204 let entry = WALEntry::new(sequence, event);
206
207 let mut current = self.current_file.write();
209 let bytes_written = current.write_entry(&entry, self.config.sync_on_write)?;
210
211 let mut stats = self.stats.write();
213 stats.total_entries += 1;
214 stats.total_bytes_written += bytes_written as u64;
215 stats.current_file_size = current.size;
216 drop(stats);
217
218 let should_rotate = current.size >= self.config.max_file_size;
220 drop(current);
221
222 if should_rotate {
223 self.rotate()?;
224 }
225
226 tracing::trace!("WAL entry written: sequence={}", sequence);
227
228 Ok(sequence)
229 }
230
231 fn rotate(&self) -> Result<()> {
233 let seq = *self.sequence.read();
234 let new_file_path = Self::generate_wal_filename(&self.wal_dir, seq);
235
236 tracing::info!("๐ Rotating WAL to new file: {:?}", new_file_path);
237
238 let new_file = WALFile::new(new_file_path)?;
239
240 let mut current = self.current_file.write();
241 current.flush()?;
242 *current = new_file;
243
244 let mut stats = self.stats.write();
245 stats.files_rotated += 1;
246 stats.current_file_size = 0;
247 drop(stats);
248
249 self.cleanup_old_files()?;
251
252 Ok(())
253 }
254
255 fn cleanup_old_files(&self) -> Result<()> {
257 let mut wal_files = self.list_wal_files()?;
258 wal_files.sort();
259
260 if wal_files.len() > self.config.max_wal_files {
261 let to_remove = wal_files.len() - self.config.max_wal_files;
262 let files_to_delete = &wal_files[..to_remove];
263
264 for file_path in files_to_delete {
265 if let Err(e) = fs::remove_file(file_path) {
266 tracing::warn!("Failed to remove old WAL file {:?}: {}", file_path, e);
267 } else {
268 tracing::debug!("๐๏ธ Removed old WAL file: {:?}", file_path);
269 let mut stats = self.stats.write();
270 stats.files_cleaned += 1;
271 }
272 }
273 }
274
275 Ok(())
276 }
277
278 fn list_wal_files(&self) -> Result<Vec<PathBuf>> {
280 let entries = fs::read_dir(&self.wal_dir)
281 .map_err(|e| AllSourceError::StorageError(format!("Failed to read WAL directory: {}", e)))?;
282
283 let mut wal_files = Vec::new();
284 for entry in entries {
285 let entry = entry.map_err(|e| {
286 AllSourceError::StorageError(format!("Failed to read directory entry: {}", e))
287 })?;
288
289 let path = entry.path();
290 if let Some(name) = path.file_name() {
291 if name.to_string_lossy().starts_with("wal-") && name.to_string_lossy().ends_with(".log") {
292 wal_files.push(path);
293 }
294 }
295 }
296
297 Ok(wal_files)
298 }
299
300 pub fn recover(&self) -> Result<Vec<Event>> {
302 tracing::info!("๐ Starting WAL recovery...");
303
304 let mut wal_files = self.list_wal_files()?;
305 wal_files.sort();
306
307 let mut recovered_events = Vec::new();
308 let mut max_sequence = 0u64;
309 let mut corrupted_entries = 0;
310
311 for wal_file_path in &wal_files {
312 tracing::debug!("Reading WAL file: {:?}", wal_file_path);
313
314 let file = File::open(wal_file_path).map_err(|e| {
315 AllSourceError::StorageError(format!("Failed to open WAL file for recovery: {}", e))
316 })?;
317
318 let reader = BufReader::new(file);
319
320 for (line_num, line) in reader.lines().enumerate() {
321 let line = line.map_err(|e| {
322 AllSourceError::StorageError(format!("Failed to read WAL line: {}", e))
323 })?;
324
325 if line.trim().is_empty() {
326 continue;
327 }
328
329 match serde_json::from_str::<WALEntry>(&line) {
330 Ok(entry) => {
331 if !entry.verify() {
333 tracing::warn!(
334 "Corrupted WAL entry at {:?}:{} (checksum mismatch)",
335 wal_file_path,
336 line_num + 1
337 );
338 corrupted_entries += 1;
339 continue;
340 }
341
342 max_sequence = max_sequence.max(entry.sequence);
343 recovered_events.push(entry.event);
344 }
345 Err(e) => {
346 tracing::warn!(
347 "Failed to parse WAL entry at {:?}:{}: {}",
348 wal_file_path,
349 line_num + 1,
350 e
351 );
352 corrupted_entries += 1;
353 }
354 }
355 }
356 }
357
358 let mut seq = self.sequence.write();
360 *seq = max_sequence;
361 drop(seq);
362
363 let mut stats = self.stats.write();
365 stats.recovery_count += 1;
366 drop(stats);
367
368 tracing::info!(
369 "โ
WAL recovery complete: {} events recovered, {} corrupted entries",
370 recovered_events.len(),
371 corrupted_entries
372 );
373
374 Ok(recovered_events)
375 }
376
377 pub fn flush(&self) -> Result<()> {
379 let mut current = self.current_file.write();
380 current.flush()?;
381 Ok(())
382 }
383
384 pub fn truncate(&self) -> Result<()> {
386 tracing::info!("๐งน Truncating WAL after checkpoint");
387
388 let mut current = self.current_file.write();
390 current.flush()?;
391
392 let wal_files = self.list_wal_files()?;
394 for file_path in wal_files {
395 fs::remove_file(&file_path).map_err(|e| {
396 AllSourceError::StorageError(format!("Failed to remove WAL file: {}", e))
397 })?;
398 tracing::debug!("Removed WAL file: {:?}", file_path);
399 }
400
401 let new_file_path = Self::generate_wal_filename(&self.wal_dir, 0);
403 *current = WALFile::new(new_file_path)?;
404
405 let mut seq = self.sequence.write();
407 *seq = 0;
408
409 tracing::info!("โ
WAL truncated successfully");
410
411 Ok(())
412 }
413
414 pub fn stats(&self) -> WALStats {
416 (*self.stats.read()).clone()
417 }
418
419 pub fn current_sequence(&self) -> u64 {
421 *self.sequence.read()
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428 use serde_json::json;
429 use tempfile::TempDir;
430 use uuid::Uuid;
431
432 fn create_test_event() -> Event {
433 Event::reconstruct_from_strings(
434 Uuid::new_v4(),
435 "test.event".to_string(),
436 "test-entity".to_string(),
437 "default".to_string(),
438 json!({"test": "data"}),
439 Utc::now(),
440 None,
441 1,
442 )
443 }
444
445 #[test]
446 fn test_wal_creation() {
447 let temp_dir = TempDir::new().unwrap();
448 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default());
449 assert!(wal.is_ok());
450 }
451
452 #[test]
453 fn test_wal_append() {
454 let temp_dir = TempDir::new().unwrap();
455 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
456
457 let event = create_test_event();
458 let seq = wal.append(event);
459 assert!(seq.is_ok());
460 assert_eq!(seq.unwrap(), 1);
461
462 let stats = wal.stats();
463 assert_eq!(stats.total_entries, 1);
464 }
465
466 #[test]
467 fn test_wal_recovery() {
468 let temp_dir = TempDir::new().unwrap();
469 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
470
471 for _ in 0..5 {
473 wal.append(create_test_event()).unwrap();
474 }
475
476 wal.flush().unwrap();
477
478 let wal2 = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
480 let recovered = wal2.recover().unwrap();
481
482 assert_eq!(recovered.len(), 5);
483 }
484
485 #[test]
486 fn test_wal_rotation() {
487 let temp_dir = TempDir::new().unwrap();
488 let config = WALConfig {
489 max_file_size: 1024, ..Default::default()
491 };
492
493 let wal = WriteAheadLog::new(temp_dir.path(), config).unwrap();
494
495 for _ in 0..50 {
497 wal.append(create_test_event()).unwrap();
498 }
499
500 let stats = wal.stats();
501 assert!(stats.files_rotated > 0);
502 }
503
504 #[test]
505 fn test_wal_entry_checksum() {
506 let event = create_test_event();
507 let entry = WALEntry::new(1, event);
508
509 assert!(entry.verify());
510
511 let mut corrupted = entry.clone();
513 corrupted.checksum = 0;
514 assert!(!corrupted.verify());
515 }
516
517 #[test]
518 fn test_wal_truncate() {
519 let temp_dir = TempDir::new().unwrap();
520 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
521
522 for _ in 0..5 {
524 wal.append(create_test_event()).unwrap();
525 }
526
527 wal.truncate().unwrap();
529
530 assert_eq!(wal.current_sequence(), 0);
532
533 let recovered = wal.recover().unwrap();
535 assert_eq!(recovered.len(), 0);
536 }
537}