allsource_core/
wal.rs

1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use chrono::{DateTime, Utc};
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, BufWriter, Write};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11/// Write-Ahead Log for durability and crash recovery
12pub struct WriteAheadLog {
13    /// Directory where WAL files are stored
14    wal_dir: PathBuf,
15
16    /// Current active WAL file
17    current_file: Arc<RwLock<WALFile>>,
18
19    /// Configuration
20    config: WALConfig,
21
22    /// Statistics
23    stats: Arc<RwLock<WALStats>>,
24
25    /// Current sequence number
26    sequence: Arc<RwLock<u64>>,
27}
28
29#[derive(Debug, Clone)]
30pub struct WALConfig {
31    /// Maximum size of a single WAL file before rotation (in bytes)
32    pub max_file_size: usize,
33
34    /// Whether to sync to disk after each write (fsync)
35    pub sync_on_write: bool,
36
37    /// Maximum number of WAL files to keep
38    pub max_wal_files: usize,
39
40    /// Enable WAL compression
41    pub compress: bool,
42}
43
44impl Default for WALConfig {
45    fn default() -> Self {
46        Self {
47            max_file_size: 64 * 1024 * 1024, // 64 MB
48            sync_on_write: true,
49            max_wal_files: 10,
50            compress: false,
51        }
52    }
53}
54
55#[derive(Debug, Clone, Default, Serialize)]
56pub struct WALStats {
57    pub total_entries: u64,
58    pub total_bytes_written: u64,
59    pub current_file_size: usize,
60    pub files_rotated: u64,
61    pub files_cleaned: u64,
62    pub recovery_count: u64,
63}
64
65/// WAL entry wrapping an event
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct WALEntry {
68    /// Sequence number for ordering
69    pub sequence: u64,
70
71    /// Timestamp when written to WAL
72    pub wal_timestamp: DateTime<Utc>,
73
74    /// The event being logged
75    pub event: Event,
76
77    /// Checksum for integrity verification
78    pub checksum: u32,
79}
80
81impl WALEntry {
82    pub fn new(sequence: u64, event: Event) -> Self {
83        let mut entry = Self {
84            sequence,
85            wal_timestamp: Utc::now(),
86            event,
87            checksum: 0,
88        };
89        entry.checksum = entry.calculate_checksum();
90        entry
91    }
92
93    fn calculate_checksum(&self) -> u32 {
94        // Simple CRC32 checksum
95        let data = format!("{}{}{}", self.sequence, self.wal_timestamp, self.event.id);
96        crc32fast::hash(data.as_bytes())
97    }
98
99    pub fn verify(&self) -> bool {
100        self.checksum == self.calculate_checksum()
101    }
102}
103
104/// Represents an active WAL file
105struct WALFile {
106    path: PathBuf,
107    writer: BufWriter<File>,
108    size: usize,
109    created_at: DateTime<Utc>,
110}
111
112impl WALFile {
113    fn new(path: PathBuf) -> Result<Self> {
114        let file = OpenOptions::new()
115            .create(true)
116            .append(true)
117            .open(&path)
118            .map_err(|e| AllSourceError::StorageError(format!("Failed to open WAL file: {}", e)))?;
119
120        let size = file.metadata().map(|m| m.len() as usize).unwrap_or(0);
121
122        Ok(Self {
123            path,
124            writer: BufWriter::new(file),
125            size,
126            created_at: Utc::now(),
127        })
128    }
129
130    fn write_entry(&mut self, entry: &WALEntry, sync: bool) -> Result<usize> {
131        // Serialize entry as JSON line
132        let json = serde_json::to_string(entry)?;
133
134        let line = format!("{}\n", json);
135        let bytes_written = line.len();
136
137        self.writer
138            .write_all(line.as_bytes())
139            .map_err(|e| AllSourceError::StorageError(format!("Failed to write to WAL: {}", e)))?;
140
141        if sync {
142            self.writer
143                .flush()
144                .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {}", e)))?;
145
146            self.writer
147                .get_ref()
148                .sync_all()
149                .map_err(|e| AllSourceError::StorageError(format!("Failed to sync WAL: {}", e)))?;
150        }
151
152        self.size += bytes_written;
153        Ok(bytes_written)
154    }
155
156    fn flush(&mut self) -> Result<()> {
157        self.writer
158            .flush()
159            .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {}", e)))?;
160        Ok(())
161    }
162}
163
164impl WriteAheadLog {
165    /// Create a new WAL
166    pub fn new(wal_dir: impl Into<PathBuf>, config: WALConfig) -> Result<Self> {
167        let wal_dir = wal_dir.into();
168
169        // Create WAL directory if it doesn't exist
170        fs::create_dir_all(&wal_dir).map_err(|e| {
171            AllSourceError::StorageError(format!("Failed to create WAL directory: {}", e))
172        })?;
173
174        // Create initial WAL file
175        let initial_file_path = Self::generate_wal_filename(&wal_dir, 0);
176        let current_file = WALFile::new(initial_file_path)?;
177
178        tracing::info!("โœ… WAL initialized at: {}", wal_dir.display());
179
180        Ok(Self {
181            wal_dir,
182            current_file: Arc::new(RwLock::new(current_file)),
183            config,
184            stats: Arc::new(RwLock::new(WALStats::default())),
185            sequence: Arc::new(RwLock::new(0)),
186        })
187    }
188
189    /// Generate a WAL filename based on sequence
190    fn generate_wal_filename(dir: &Path, sequence: u64) -> PathBuf {
191        dir.join(format!("wal-{:016x}.log", sequence))
192    }
193
194    /// Write an event to the WAL
195    pub fn append(&self, event: Event) -> Result<u64> {
196        // Get next sequence number
197        let mut seq = self.sequence.write();
198        *seq += 1;
199        let sequence = *seq;
200        drop(seq);
201
202        // Create WAL entry
203        let entry = WALEntry::new(sequence, event);
204
205        // Write to current file
206        let mut current = self.current_file.write();
207        let bytes_written = current.write_entry(&entry, self.config.sync_on_write)?;
208
209        // Update statistics
210        let mut stats = self.stats.write();
211        stats.total_entries += 1;
212        stats.total_bytes_written += bytes_written as u64;
213        stats.current_file_size = current.size;
214        drop(stats);
215
216        // Check if we need to rotate
217        let should_rotate = current.size >= self.config.max_file_size;
218        drop(current);
219
220        if should_rotate {
221            self.rotate()?;
222        }
223
224        tracing::trace!("WAL entry written: sequence={}", sequence);
225
226        Ok(sequence)
227    }
228
229    /// Rotate to a new WAL file
230    fn rotate(&self) -> Result<()> {
231        let seq = *self.sequence.read();
232        let new_file_path = Self::generate_wal_filename(&self.wal_dir, seq);
233
234        tracing::info!("๐Ÿ”„ Rotating WAL to new file: {:?}", new_file_path);
235
236        let new_file = WALFile::new(new_file_path)?;
237
238        let mut current = self.current_file.write();
239        current.flush()?;
240        *current = new_file;
241
242        let mut stats = self.stats.write();
243        stats.files_rotated += 1;
244        stats.current_file_size = 0;
245        drop(stats);
246
247        // Clean up old WAL files
248        self.cleanup_old_files()?;
249
250        Ok(())
251    }
252
253    /// Clean up old WAL files beyond the retention limit
254    fn cleanup_old_files(&self) -> Result<()> {
255        let mut wal_files = self.list_wal_files()?;
256        wal_files.sort();
257
258        if wal_files.len() > self.config.max_wal_files {
259            let to_remove = wal_files.len() - self.config.max_wal_files;
260            let files_to_delete = &wal_files[..to_remove];
261
262            for file_path in files_to_delete {
263                if let Err(e) = fs::remove_file(file_path) {
264                    tracing::warn!("Failed to remove old WAL file {:?}: {}", file_path, e);
265                } else {
266                    tracing::debug!("๐Ÿ—‘๏ธ Removed old WAL file: {:?}", file_path);
267                    let mut stats = self.stats.write();
268                    stats.files_cleaned += 1;
269                }
270            }
271        }
272
273        Ok(())
274    }
275
276    /// List all WAL files in the directory
277    fn list_wal_files(&self) -> Result<Vec<PathBuf>> {
278        let entries = fs::read_dir(&self.wal_dir).map_err(|e| {
279            AllSourceError::StorageError(format!("Failed to read WAL directory: {}", e))
280        })?;
281
282        let mut wal_files = Vec::new();
283        for entry in entries {
284            let entry = entry.map_err(|e| {
285                AllSourceError::StorageError(format!("Failed to read directory entry: {}", e))
286            })?;
287
288            let path = entry.path();
289            if let Some(name) = path.file_name() {
290                if name.to_string_lossy().starts_with("wal-")
291                    && name.to_string_lossy().ends_with(".log")
292                {
293                    wal_files.push(path);
294                }
295            }
296        }
297
298        Ok(wal_files)
299    }
300
301    /// Recover events from WAL files
302    pub fn recover(&self) -> Result<Vec<Event>> {
303        tracing::info!("๐Ÿ”„ Starting WAL recovery...");
304
305        let mut wal_files = self.list_wal_files()?;
306        wal_files.sort();
307
308        let mut recovered_events = Vec::new();
309        let mut max_sequence = 0u64;
310        let mut corrupted_entries = 0;
311
312        for wal_file_path in &wal_files {
313            tracing::debug!("Reading WAL file: {:?}", wal_file_path);
314
315            let file = File::open(wal_file_path).map_err(|e| {
316                AllSourceError::StorageError(format!("Failed to open WAL file for recovery: {}", e))
317            })?;
318
319            let reader = BufReader::new(file);
320
321            for (line_num, line) in reader.lines().enumerate() {
322                let line = line.map_err(|e| {
323                    AllSourceError::StorageError(format!("Failed to read WAL line: {}", e))
324                })?;
325
326                if line.trim().is_empty() {
327                    continue;
328                }
329
330                match serde_json::from_str::<WALEntry>(&line) {
331                    Ok(entry) => {
332                        // Verify checksum
333                        if !entry.verify() {
334                            tracing::warn!(
335                                "Corrupted WAL entry at {:?}:{} (checksum mismatch)",
336                                wal_file_path,
337                                line_num + 1
338                            );
339                            corrupted_entries += 1;
340                            continue;
341                        }
342
343                        max_sequence = max_sequence.max(entry.sequence);
344                        recovered_events.push(entry.event);
345                    }
346                    Err(e) => {
347                        tracing::warn!(
348                            "Failed to parse WAL entry at {:?}:{}: {}",
349                            wal_file_path,
350                            line_num + 1,
351                            e
352                        );
353                        corrupted_entries += 1;
354                    }
355                }
356            }
357        }
358
359        // Update sequence counter
360        let mut seq = self.sequence.write();
361        *seq = max_sequence;
362        drop(seq);
363
364        // Update stats
365        let mut stats = self.stats.write();
366        stats.recovery_count += 1;
367        drop(stats);
368
369        tracing::info!(
370            "โœ… WAL recovery complete: {} events recovered, {} corrupted entries",
371            recovered_events.len(),
372            corrupted_entries
373        );
374
375        Ok(recovered_events)
376    }
377
378    /// Manually flush the current WAL file
379    pub fn flush(&self) -> Result<()> {
380        let mut current = self.current_file.write();
381        current.flush()?;
382        Ok(())
383    }
384
385    /// Truncate WAL after successful checkpoint
386    pub fn truncate(&self) -> Result<()> {
387        tracing::info!("๐Ÿงน Truncating WAL after checkpoint");
388
389        // Close current file
390        let mut current = self.current_file.write();
391        current.flush()?;
392
393        // Remove all WAL files
394        let wal_files = self.list_wal_files()?;
395        for file_path in wal_files {
396            fs::remove_file(&file_path).map_err(|e| {
397                AllSourceError::StorageError(format!("Failed to remove WAL file: {}", e))
398            })?;
399            tracing::debug!("Removed WAL file: {:?}", file_path);
400        }
401
402        // Create new WAL file
403        let new_file_path = Self::generate_wal_filename(&self.wal_dir, 0);
404        *current = WALFile::new(new_file_path)?;
405
406        // Reset sequence
407        let mut seq = self.sequence.write();
408        *seq = 0;
409
410        tracing::info!("โœ… WAL truncated successfully");
411
412        Ok(())
413    }
414
415    /// Get WAL statistics
416    pub fn stats(&self) -> WALStats {
417        (*self.stats.read()).clone()
418    }
419
420    /// Get current sequence number
421    pub fn current_sequence(&self) -> u64 {
422        *self.sequence.read()
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429    use serde_json::json;
430    use tempfile::TempDir;
431    use uuid::Uuid;
432
433    fn create_test_event() -> Event {
434        Event::reconstruct_from_strings(
435            Uuid::new_v4(),
436            "test.event".to_string(),
437            "test-entity".to_string(),
438            "default".to_string(),
439            json!({"test": "data"}),
440            Utc::now(),
441            None,
442            1,
443        )
444    }
445
446    #[test]
447    fn test_wal_creation() {
448        let temp_dir = TempDir::new().unwrap();
449        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default());
450        assert!(wal.is_ok());
451    }
452
453    #[test]
454    fn test_wal_append() {
455        let temp_dir = TempDir::new().unwrap();
456        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
457
458        let event = create_test_event();
459        let seq = wal.append(event);
460        assert!(seq.is_ok());
461        assert_eq!(seq.unwrap(), 1);
462
463        let stats = wal.stats();
464        assert_eq!(stats.total_entries, 1);
465    }
466
467    #[test]
468    fn test_wal_recovery() {
469        let temp_dir = TempDir::new().unwrap();
470        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
471
472        // Write some events
473        for _ in 0..5 {
474            wal.append(create_test_event()).unwrap();
475        }
476
477        wal.flush().unwrap();
478
479        // Create new WAL instance (simulating restart)
480        let wal2 = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
481        let recovered = wal2.recover().unwrap();
482
483        assert_eq!(recovered.len(), 5);
484    }
485
486    #[test]
487    fn test_wal_rotation() {
488        let temp_dir = TempDir::new().unwrap();
489        let config = WALConfig {
490            max_file_size: 1024, // Small size to trigger rotation
491            ..Default::default()
492        };
493
494        let wal = WriteAheadLog::new(temp_dir.path(), config).unwrap();
495
496        // Write enough events to trigger rotation
497        for _ in 0..50 {
498            wal.append(create_test_event()).unwrap();
499        }
500
501        let stats = wal.stats();
502        assert!(stats.files_rotated > 0);
503    }
504
505    #[test]
506    fn test_wal_entry_checksum() {
507        let event = create_test_event();
508        let entry = WALEntry::new(1, event);
509
510        assert!(entry.verify());
511
512        // Modify and verify it fails
513        let mut corrupted = entry.clone();
514        corrupted.checksum = 0;
515        assert!(!corrupted.verify());
516    }
517
518    #[test]
519    fn test_wal_truncate() {
520        let temp_dir = TempDir::new().unwrap();
521        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
522
523        // Write events
524        for _ in 0..5 {
525            wal.append(create_test_event()).unwrap();
526        }
527
528        // Truncate
529        wal.truncate().unwrap();
530
531        // Verify sequence is reset
532        assert_eq!(wal.current_sequence(), 0);
533
534        // Verify recovery returns empty
535        let recovered = wal.recover().unwrap();
536        assert_eq!(recovered.len(), 0);
537    }
538}