1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use chrono::{DateTime, Utc};
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, BufWriter, Write};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11pub struct WriteAheadLog {
13 wal_dir: PathBuf,
15
16 current_file: Arc<RwLock<WALFile>>,
18
19 config: WALConfig,
21
22 stats: Arc<RwLock<WALStats>>,
24
25 sequence: Arc<RwLock<u64>>,
27}
28
29#[derive(Debug, Clone)]
30pub struct WALConfig {
31 pub max_file_size: usize,
33
34 pub sync_on_write: bool,
36
37 pub max_wal_files: usize,
39
40 pub compress: bool,
42}
43
44impl Default for WALConfig {
45 fn default() -> Self {
46 Self {
47 max_file_size: 64 * 1024 * 1024, sync_on_write: true,
49 max_wal_files: 10,
50 compress: false,
51 }
52 }
53}
54
55#[derive(Debug, Clone, Default, Serialize)]
56pub struct WALStats {
57 pub total_entries: u64,
58 pub total_bytes_written: u64,
59 pub current_file_size: usize,
60 pub files_rotated: u64,
61 pub files_cleaned: u64,
62 pub recovery_count: u64,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct WALEntry {
68 pub sequence: u64,
70
71 pub wal_timestamp: DateTime<Utc>,
73
74 pub event: Event,
76
77 pub checksum: u32,
79}
80
81impl WALEntry {
82 pub fn new(sequence: u64, event: Event) -> Self {
83 let mut entry = Self {
84 sequence,
85 wal_timestamp: Utc::now(),
86 event,
87 checksum: 0,
88 };
89 entry.checksum = entry.calculate_checksum();
90 entry
91 }
92
93 fn calculate_checksum(&self) -> u32 {
94 let data = format!("{}{}{}", self.sequence, self.wal_timestamp, self.event.id);
96 crc32fast::hash(data.as_bytes())
97 }
98
99 pub fn verify(&self) -> bool {
100 self.checksum == self.calculate_checksum()
101 }
102}
103
104struct WALFile {
106 path: PathBuf,
107 writer: BufWriter<File>,
108 size: usize,
109 created_at: DateTime<Utc>,
110}
111
112impl WALFile {
113 fn new(path: PathBuf) -> Result<Self> {
114 let file = OpenOptions::new()
115 .create(true)
116 .append(true)
117 .open(&path)
118 .map_err(|e| AllSourceError::StorageError(format!("Failed to open WAL file: {}", e)))?;
119
120 let size = file.metadata().map(|m| m.len() as usize).unwrap_or(0);
121
122 Ok(Self {
123 path,
124 writer: BufWriter::new(file),
125 size,
126 created_at: Utc::now(),
127 })
128 }
129
130 fn write_entry(&mut self, entry: &WALEntry, sync: bool) -> Result<usize> {
131 let json = serde_json::to_string(entry)?;
133
134 let line = format!("{}\n", json);
135 let bytes_written = line.len();
136
137 self.writer
138 .write_all(line.as_bytes())
139 .map_err(|e| AllSourceError::StorageError(format!("Failed to write to WAL: {}", e)))?;
140
141 if sync {
142 self.writer
143 .flush()
144 .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {}", e)))?;
145
146 self.writer
147 .get_ref()
148 .sync_all()
149 .map_err(|e| AllSourceError::StorageError(format!("Failed to sync WAL: {}", e)))?;
150 }
151
152 self.size += bytes_written;
153 Ok(bytes_written)
154 }
155
156 fn flush(&mut self) -> Result<()> {
157 self.writer
158 .flush()
159 .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {}", e)))?;
160 Ok(())
161 }
162}
163
164impl WriteAheadLog {
165 pub fn new(wal_dir: impl Into<PathBuf>, config: WALConfig) -> Result<Self> {
167 let wal_dir = wal_dir.into();
168
169 fs::create_dir_all(&wal_dir).map_err(|e| {
171 AllSourceError::StorageError(format!("Failed to create WAL directory: {}", e))
172 })?;
173
174 let initial_file_path = Self::generate_wal_filename(&wal_dir, 0);
176 let current_file = WALFile::new(initial_file_path)?;
177
178 tracing::info!("โ
WAL initialized at: {}", wal_dir.display());
179
180 Ok(Self {
181 wal_dir,
182 current_file: Arc::new(RwLock::new(current_file)),
183 config,
184 stats: Arc::new(RwLock::new(WALStats::default())),
185 sequence: Arc::new(RwLock::new(0)),
186 })
187 }
188
189 fn generate_wal_filename(dir: &Path, sequence: u64) -> PathBuf {
191 dir.join(format!("wal-{:016x}.log", sequence))
192 }
193
194 pub fn append(&self, event: Event) -> Result<u64> {
196 let mut seq = self.sequence.write();
198 *seq += 1;
199 let sequence = *seq;
200 drop(seq);
201
202 let entry = WALEntry::new(sequence, event);
204
205 let mut current = self.current_file.write();
207 let bytes_written = current.write_entry(&entry, self.config.sync_on_write)?;
208
209 let mut stats = self.stats.write();
211 stats.total_entries += 1;
212 stats.total_bytes_written += bytes_written as u64;
213 stats.current_file_size = current.size;
214 drop(stats);
215
216 let should_rotate = current.size >= self.config.max_file_size;
218 drop(current);
219
220 if should_rotate {
221 self.rotate()?;
222 }
223
224 tracing::trace!("WAL entry written: sequence={}", sequence);
225
226 Ok(sequence)
227 }
228
229 fn rotate(&self) -> Result<()> {
231 let seq = *self.sequence.read();
232 let new_file_path = Self::generate_wal_filename(&self.wal_dir, seq);
233
234 tracing::info!("๐ Rotating WAL to new file: {:?}", new_file_path);
235
236 let new_file = WALFile::new(new_file_path)?;
237
238 let mut current = self.current_file.write();
239 current.flush()?;
240 *current = new_file;
241
242 let mut stats = self.stats.write();
243 stats.files_rotated += 1;
244 stats.current_file_size = 0;
245 drop(stats);
246
247 self.cleanup_old_files()?;
249
250 Ok(())
251 }
252
253 fn cleanup_old_files(&self) -> Result<()> {
255 let mut wal_files = self.list_wal_files()?;
256 wal_files.sort();
257
258 if wal_files.len() > self.config.max_wal_files {
259 let to_remove = wal_files.len() - self.config.max_wal_files;
260 let files_to_delete = &wal_files[..to_remove];
261
262 for file_path in files_to_delete {
263 if let Err(e) = fs::remove_file(file_path) {
264 tracing::warn!("Failed to remove old WAL file {:?}: {}", file_path, e);
265 } else {
266 tracing::debug!("๐๏ธ Removed old WAL file: {:?}", file_path);
267 let mut stats = self.stats.write();
268 stats.files_cleaned += 1;
269 }
270 }
271 }
272
273 Ok(())
274 }
275
276 fn list_wal_files(&self) -> Result<Vec<PathBuf>> {
278 let entries = fs::read_dir(&self.wal_dir).map_err(|e| {
279 AllSourceError::StorageError(format!("Failed to read WAL directory: {}", e))
280 })?;
281
282 let mut wal_files = Vec::new();
283 for entry in entries {
284 let entry = entry.map_err(|e| {
285 AllSourceError::StorageError(format!("Failed to read directory entry: {}", e))
286 })?;
287
288 let path = entry.path();
289 if let Some(name) = path.file_name() {
290 if name.to_string_lossy().starts_with("wal-")
291 && name.to_string_lossy().ends_with(".log")
292 {
293 wal_files.push(path);
294 }
295 }
296 }
297
298 Ok(wal_files)
299 }
300
301 pub fn recover(&self) -> Result<Vec<Event>> {
303 tracing::info!("๐ Starting WAL recovery...");
304
305 let mut wal_files = self.list_wal_files()?;
306 wal_files.sort();
307
308 let mut recovered_events = Vec::new();
309 let mut max_sequence = 0u64;
310 let mut corrupted_entries = 0;
311
312 for wal_file_path in &wal_files {
313 tracing::debug!("Reading WAL file: {:?}", wal_file_path);
314
315 let file = File::open(wal_file_path).map_err(|e| {
316 AllSourceError::StorageError(format!("Failed to open WAL file for recovery: {}", e))
317 })?;
318
319 let reader = BufReader::new(file);
320
321 for (line_num, line) in reader.lines().enumerate() {
322 let line = line.map_err(|e| {
323 AllSourceError::StorageError(format!("Failed to read WAL line: {}", e))
324 })?;
325
326 if line.trim().is_empty() {
327 continue;
328 }
329
330 match serde_json::from_str::<WALEntry>(&line) {
331 Ok(entry) => {
332 if !entry.verify() {
334 tracing::warn!(
335 "Corrupted WAL entry at {:?}:{} (checksum mismatch)",
336 wal_file_path,
337 line_num + 1
338 );
339 corrupted_entries += 1;
340 continue;
341 }
342
343 max_sequence = max_sequence.max(entry.sequence);
344 recovered_events.push(entry.event);
345 }
346 Err(e) => {
347 tracing::warn!(
348 "Failed to parse WAL entry at {:?}:{}: {}",
349 wal_file_path,
350 line_num + 1,
351 e
352 );
353 corrupted_entries += 1;
354 }
355 }
356 }
357 }
358
359 let mut seq = self.sequence.write();
361 *seq = max_sequence;
362 drop(seq);
363
364 let mut stats = self.stats.write();
366 stats.recovery_count += 1;
367 drop(stats);
368
369 tracing::info!(
370 "โ
WAL recovery complete: {} events recovered, {} corrupted entries",
371 recovered_events.len(),
372 corrupted_entries
373 );
374
375 Ok(recovered_events)
376 }
377
378 pub fn flush(&self) -> Result<()> {
380 let mut current = self.current_file.write();
381 current.flush()?;
382 Ok(())
383 }
384
385 pub fn truncate(&self) -> Result<()> {
387 tracing::info!("๐งน Truncating WAL after checkpoint");
388
389 let mut current = self.current_file.write();
391 current.flush()?;
392
393 let wal_files = self.list_wal_files()?;
395 for file_path in wal_files {
396 fs::remove_file(&file_path).map_err(|e| {
397 AllSourceError::StorageError(format!("Failed to remove WAL file: {}", e))
398 })?;
399 tracing::debug!("Removed WAL file: {:?}", file_path);
400 }
401
402 let new_file_path = Self::generate_wal_filename(&self.wal_dir, 0);
404 *current = WALFile::new(new_file_path)?;
405
406 let mut seq = self.sequence.write();
408 *seq = 0;
409
410 tracing::info!("โ
WAL truncated successfully");
411
412 Ok(())
413 }
414
415 pub fn stats(&self) -> WALStats {
417 (*self.stats.read()).clone()
418 }
419
420 pub fn current_sequence(&self) -> u64 {
422 *self.sequence.read()
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429 use serde_json::json;
430 use tempfile::TempDir;
431 use uuid::Uuid;
432
433 fn create_test_event() -> Event {
434 Event::reconstruct_from_strings(
435 Uuid::new_v4(),
436 "test.event".to_string(),
437 "test-entity".to_string(),
438 "default".to_string(),
439 json!({"test": "data"}),
440 Utc::now(),
441 None,
442 1,
443 )
444 }
445
446 #[test]
447 fn test_wal_creation() {
448 let temp_dir = TempDir::new().unwrap();
449 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default());
450 assert!(wal.is_ok());
451 }
452
453 #[test]
454 fn test_wal_append() {
455 let temp_dir = TempDir::new().unwrap();
456 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
457
458 let event = create_test_event();
459 let seq = wal.append(event);
460 assert!(seq.is_ok());
461 assert_eq!(seq.unwrap(), 1);
462
463 let stats = wal.stats();
464 assert_eq!(stats.total_entries, 1);
465 }
466
467 #[test]
468 fn test_wal_recovery() {
469 let temp_dir = TempDir::new().unwrap();
470 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
471
472 for _ in 0..5 {
474 wal.append(create_test_event()).unwrap();
475 }
476
477 wal.flush().unwrap();
478
479 let wal2 = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
481 let recovered = wal2.recover().unwrap();
482
483 assert_eq!(recovered.len(), 5);
484 }
485
486 #[test]
487 fn test_wal_rotation() {
488 let temp_dir = TempDir::new().unwrap();
489 let config = WALConfig {
490 max_file_size: 1024, ..Default::default()
492 };
493
494 let wal = WriteAheadLog::new(temp_dir.path(), config).unwrap();
495
496 for _ in 0..50 {
498 wal.append(create_test_event()).unwrap();
499 }
500
501 let stats = wal.stats();
502 assert!(stats.files_rotated > 0);
503 }
504
505 #[test]
506 fn test_wal_entry_checksum() {
507 let event = create_test_event();
508 let entry = WALEntry::new(1, event);
509
510 assert!(entry.verify());
511
512 let mut corrupted = entry.clone();
514 corrupted.checksum = 0;
515 assert!(!corrupted.verify());
516 }
517
518 #[test]
519 fn test_wal_truncate() {
520 let temp_dir = TempDir::new().unwrap();
521 let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
522
523 for _ in 0..5 {
525 wal.append(create_test_event()).unwrap();
526 }
527
528 wal.truncate().unwrap();
530
531 assert_eq!(wal.current_sequence(), 0);
533
534 let recovered = wal.recover().unwrap();
536 assert_eq!(recovered.len(), 0);
537 }
538}