allsource_core/
wal.rs

1use crate::error::{AllSourceError, Result};
2use crate::domain::entities::Event;
3use chrono::{DateTime, Utc};
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, BufWriter, Write};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11/// Write-Ahead Log for durability and crash recovery
12pub struct WriteAheadLog {
13    /// Directory where WAL files are stored
14    wal_dir: PathBuf,
15
16    /// Current active WAL file
17    current_file: Arc<RwLock<WALFile>>,
18
19    /// Configuration
20    config: WALConfig,
21
22    /// Statistics
23    stats: Arc<RwLock<WALStats>>,
24
25    /// Current sequence number
26    sequence: Arc<RwLock<u64>>,
27}
28
29#[derive(Debug, Clone)]
30pub struct WALConfig {
31    /// Maximum size of a single WAL file before rotation (in bytes)
32    pub max_file_size: usize,
33
34    /// Whether to sync to disk after each write (fsync)
35    pub sync_on_write: bool,
36
37    /// Maximum number of WAL files to keep
38    pub max_wal_files: usize,
39
40    /// Enable WAL compression
41    pub compress: bool,
42}
43
44impl Default for WALConfig {
45    fn default() -> Self {
46        Self {
47            max_file_size: 64 * 1024 * 1024, // 64 MB
48            sync_on_write: true,
49            max_wal_files: 10,
50            compress: false,
51        }
52    }
53}
54
55#[derive(Debug, Clone, Default, Serialize)]
56pub struct WALStats {
57    pub total_entries: u64,
58    pub total_bytes_written: u64,
59    pub current_file_size: usize,
60    pub files_rotated: u64,
61    pub files_cleaned: u64,
62    pub recovery_count: u64,
63}
64
65/// WAL entry wrapping an event
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct WALEntry {
68    /// Sequence number for ordering
69    pub sequence: u64,
70
71    /// Timestamp when written to WAL
72    pub wal_timestamp: DateTime<Utc>,
73
74    /// The event being logged
75    pub event: Event,
76
77    /// Checksum for integrity verification
78    pub checksum: u32,
79}
80
81impl WALEntry {
82    pub fn new(sequence: u64, event: Event) -> Self {
83        let mut entry = Self {
84            sequence,
85            wal_timestamp: Utc::now(),
86            event,
87            checksum: 0,
88        };
89        entry.checksum = entry.calculate_checksum();
90        entry
91    }
92
93    fn calculate_checksum(&self) -> u32 {
94        // Simple CRC32 checksum
95        let data = format!("{}{}{}", self.sequence, self.wal_timestamp, self.event.id);
96        crc32fast::hash(data.as_bytes())
97    }
98
99    pub fn verify(&self) -> bool {
100        self.checksum == self.calculate_checksum()
101    }
102}
103
104/// Represents an active WAL file
105struct WALFile {
106    path: PathBuf,
107    writer: BufWriter<File>,
108    size: usize,
109    created_at: DateTime<Utc>,
110}
111
112impl WALFile {
113    fn new(path: PathBuf) -> Result<Self> {
114        let file = OpenOptions::new()
115            .create(true)
116            .append(true)
117            .open(&path)
118            .map_err(|e| AllSourceError::StorageError(format!("Failed to open WAL file: {}", e)))?;
119
120        let size = file
121            .metadata()
122            .map(|m| m.len() as usize)
123            .unwrap_or(0);
124
125        Ok(Self {
126            path,
127            writer: BufWriter::new(file),
128            size,
129            created_at: Utc::now(),
130        })
131    }
132
133    fn write_entry(&mut self, entry: &WALEntry, sync: bool) -> Result<usize> {
134        // Serialize entry as JSON line
135        let json = serde_json::to_string(entry)?;
136
137        let line = format!("{}\n", json);
138        let bytes_written = line.len();
139
140        self.writer
141            .write_all(line.as_bytes())
142            .map_err(|e| AllSourceError::StorageError(format!("Failed to write to WAL: {}", e)))?;
143
144        if sync {
145            self.writer
146                .flush()
147                .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {}", e)))?;
148
149            self.writer
150                .get_ref()
151                .sync_all()
152                .map_err(|e| AllSourceError::StorageError(format!("Failed to sync WAL: {}", e)))?;
153        }
154
155        self.size += bytes_written;
156        Ok(bytes_written)
157    }
158
159    fn flush(&mut self) -> Result<()> {
160        self.writer
161            .flush()
162            .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {}", e)))?;
163        Ok(())
164    }
165}
166
167impl WriteAheadLog {
168    /// Create a new WAL
169    pub fn new(wal_dir: impl Into<PathBuf>, config: WALConfig) -> Result<Self> {
170        let wal_dir = wal_dir.into();
171
172        // Create WAL directory if it doesn't exist
173        fs::create_dir_all(&wal_dir)
174            .map_err(|e| AllSourceError::StorageError(format!("Failed to create WAL directory: {}", e)))?;
175
176        // Create initial WAL file
177        let initial_file_path = Self::generate_wal_filename(&wal_dir, 0);
178        let current_file = WALFile::new(initial_file_path)?;
179
180        tracing::info!("โœ… WAL initialized at: {}", wal_dir.display());
181
182        Ok(Self {
183            wal_dir,
184            current_file: Arc::new(RwLock::new(current_file)),
185            config,
186            stats: Arc::new(RwLock::new(WALStats::default())),
187            sequence: Arc::new(RwLock::new(0)),
188        })
189    }
190
191    /// Generate a WAL filename based on sequence
192    fn generate_wal_filename(dir: &Path, sequence: u64) -> PathBuf {
193        dir.join(format!("wal-{:016x}.log", sequence))
194    }
195
196    /// Write an event to the WAL
197    pub fn append(&self, event: Event) -> Result<u64> {
198        // Get next sequence number
199        let mut seq = self.sequence.write();
200        *seq += 1;
201        let sequence = *seq;
202        drop(seq);
203
204        // Create WAL entry
205        let entry = WALEntry::new(sequence, event);
206
207        // Write to current file
208        let mut current = self.current_file.write();
209        let bytes_written = current.write_entry(&entry, self.config.sync_on_write)?;
210
211        // Update statistics
212        let mut stats = self.stats.write();
213        stats.total_entries += 1;
214        stats.total_bytes_written += bytes_written as u64;
215        stats.current_file_size = current.size;
216        drop(stats);
217
218        // Check if we need to rotate
219        let should_rotate = current.size >= self.config.max_file_size;
220        drop(current);
221
222        if should_rotate {
223            self.rotate()?;
224        }
225
226        tracing::trace!("WAL entry written: sequence={}", sequence);
227
228        Ok(sequence)
229    }
230
231    /// Rotate to a new WAL file
232    fn rotate(&self) -> Result<()> {
233        let seq = *self.sequence.read();
234        let new_file_path = Self::generate_wal_filename(&self.wal_dir, seq);
235
236        tracing::info!("๐Ÿ”„ Rotating WAL to new file: {:?}", new_file_path);
237
238        let new_file = WALFile::new(new_file_path)?;
239
240        let mut current = self.current_file.write();
241        current.flush()?;
242        *current = new_file;
243
244        let mut stats = self.stats.write();
245        stats.files_rotated += 1;
246        stats.current_file_size = 0;
247        drop(stats);
248
249        // Clean up old WAL files
250        self.cleanup_old_files()?;
251
252        Ok(())
253    }
254
255    /// Clean up old WAL files beyond the retention limit
256    fn cleanup_old_files(&self) -> Result<()> {
257        let mut wal_files = self.list_wal_files()?;
258        wal_files.sort();
259
260        if wal_files.len() > self.config.max_wal_files {
261            let to_remove = wal_files.len() - self.config.max_wal_files;
262            let files_to_delete = &wal_files[..to_remove];
263
264            for file_path in files_to_delete {
265                if let Err(e) = fs::remove_file(file_path) {
266                    tracing::warn!("Failed to remove old WAL file {:?}: {}", file_path, e);
267                } else {
268                    tracing::debug!("๐Ÿ—‘๏ธ Removed old WAL file: {:?}", file_path);
269                    let mut stats = self.stats.write();
270                    stats.files_cleaned += 1;
271                }
272            }
273        }
274
275        Ok(())
276    }
277
278    /// List all WAL files in the directory
279    fn list_wal_files(&self) -> Result<Vec<PathBuf>> {
280        let entries = fs::read_dir(&self.wal_dir)
281            .map_err(|e| AllSourceError::StorageError(format!("Failed to read WAL directory: {}", e)))?;
282
283        let mut wal_files = Vec::new();
284        for entry in entries {
285            let entry = entry.map_err(|e| {
286                AllSourceError::StorageError(format!("Failed to read directory entry: {}", e))
287            })?;
288
289            let path = entry.path();
290            if let Some(name) = path.file_name() {
291                if name.to_string_lossy().starts_with("wal-") && name.to_string_lossy().ends_with(".log") {
292                    wal_files.push(path);
293                }
294            }
295        }
296
297        Ok(wal_files)
298    }
299
300    /// Recover events from WAL files
301    pub fn recover(&self) -> Result<Vec<Event>> {
302        tracing::info!("๐Ÿ”„ Starting WAL recovery...");
303
304        let mut wal_files = self.list_wal_files()?;
305        wal_files.sort();
306
307        let mut recovered_events = Vec::new();
308        let mut max_sequence = 0u64;
309        let mut corrupted_entries = 0;
310
311        for wal_file_path in &wal_files {
312            tracing::debug!("Reading WAL file: {:?}", wal_file_path);
313
314            let file = File::open(wal_file_path).map_err(|e| {
315                AllSourceError::StorageError(format!("Failed to open WAL file for recovery: {}", e))
316            })?;
317
318            let reader = BufReader::new(file);
319
320            for (line_num, line) in reader.lines().enumerate() {
321                let line = line.map_err(|e| {
322                    AllSourceError::StorageError(format!("Failed to read WAL line: {}", e))
323                })?;
324
325                if line.trim().is_empty() {
326                    continue;
327                }
328
329                match serde_json::from_str::<WALEntry>(&line) {
330                    Ok(entry) => {
331                        // Verify checksum
332                        if !entry.verify() {
333                            tracing::warn!(
334                                "Corrupted WAL entry at {:?}:{} (checksum mismatch)",
335                                wal_file_path,
336                                line_num + 1
337                            );
338                            corrupted_entries += 1;
339                            continue;
340                        }
341
342                        max_sequence = max_sequence.max(entry.sequence);
343                        recovered_events.push(entry.event);
344                    }
345                    Err(e) => {
346                        tracing::warn!(
347                            "Failed to parse WAL entry at {:?}:{}: {}",
348                            wal_file_path,
349                            line_num + 1,
350                            e
351                        );
352                        corrupted_entries += 1;
353                    }
354                }
355            }
356        }
357
358        // Update sequence counter
359        let mut seq = self.sequence.write();
360        *seq = max_sequence;
361        drop(seq);
362
363        // Update stats
364        let mut stats = self.stats.write();
365        stats.recovery_count += 1;
366        drop(stats);
367
368        tracing::info!(
369            "โœ… WAL recovery complete: {} events recovered, {} corrupted entries",
370            recovered_events.len(),
371            corrupted_entries
372        );
373
374        Ok(recovered_events)
375    }
376
377    /// Manually flush the current WAL file
378    pub fn flush(&self) -> Result<()> {
379        let mut current = self.current_file.write();
380        current.flush()?;
381        Ok(())
382    }
383
384    /// Truncate WAL after successful checkpoint
385    pub fn truncate(&self) -> Result<()> {
386        tracing::info!("๐Ÿงน Truncating WAL after checkpoint");
387
388        // Close current file
389        let mut current = self.current_file.write();
390        current.flush()?;
391
392        // Remove all WAL files
393        let wal_files = self.list_wal_files()?;
394        for file_path in wal_files {
395            fs::remove_file(&file_path).map_err(|e| {
396                AllSourceError::StorageError(format!("Failed to remove WAL file: {}", e))
397            })?;
398            tracing::debug!("Removed WAL file: {:?}", file_path);
399        }
400
401        // Create new WAL file
402        let new_file_path = Self::generate_wal_filename(&self.wal_dir, 0);
403        *current = WALFile::new(new_file_path)?;
404
405        // Reset sequence
406        let mut seq = self.sequence.write();
407        *seq = 0;
408
409        tracing::info!("โœ… WAL truncated successfully");
410
411        Ok(())
412    }
413
414    /// Get WAL statistics
415    pub fn stats(&self) -> WALStats {
416        (*self.stats.read()).clone()
417    }
418
419    /// Get current sequence number
420    pub fn current_sequence(&self) -> u64 {
421        *self.sequence.read()
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428    use serde_json::json;
429    use tempfile::TempDir;
430    use uuid::Uuid;
431
432    fn create_test_event() -> Event {
433        Event::reconstruct_from_strings(
434            Uuid::new_v4(),
435            "test.event".to_string(),
436            "test-entity".to_string(),
437            "default".to_string(),
438            json!({"test": "data"}),
439            Utc::now(),
440            None,
441            1,
442        )
443    }
444
445    #[test]
446    fn test_wal_creation() {
447        let temp_dir = TempDir::new().unwrap();
448        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default());
449        assert!(wal.is_ok());
450    }
451
452    #[test]
453    fn test_wal_append() {
454        let temp_dir = TempDir::new().unwrap();
455        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
456
457        let event = create_test_event();
458        let seq = wal.append(event);
459        assert!(seq.is_ok());
460        assert_eq!(seq.unwrap(), 1);
461
462        let stats = wal.stats();
463        assert_eq!(stats.total_entries, 1);
464    }
465
466    #[test]
467    fn test_wal_recovery() {
468        let temp_dir = TempDir::new().unwrap();
469        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
470
471        // Write some events
472        for _ in 0..5 {
473            wal.append(create_test_event()).unwrap();
474        }
475
476        wal.flush().unwrap();
477
478        // Create new WAL instance (simulating restart)
479        let wal2 = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
480        let recovered = wal2.recover().unwrap();
481
482        assert_eq!(recovered.len(), 5);
483    }
484
485    #[test]
486    fn test_wal_rotation() {
487        let temp_dir = TempDir::new().unwrap();
488        let config = WALConfig {
489            max_file_size: 1024, // Small size to trigger rotation
490            ..Default::default()
491        };
492
493        let wal = WriteAheadLog::new(temp_dir.path(), config).unwrap();
494
495        // Write enough events to trigger rotation
496        for _ in 0..50 {
497            wal.append(create_test_event()).unwrap();
498        }
499
500        let stats = wal.stats();
501        assert!(stats.files_rotated > 0);
502    }
503
504    #[test]
505    fn test_wal_entry_checksum() {
506        let event = create_test_event();
507        let entry = WALEntry::new(1, event);
508
509        assert!(entry.verify());
510
511        // Modify and verify it fails
512        let mut corrupted = entry.clone();
513        corrupted.checksum = 0;
514        assert!(!corrupted.verify());
515    }
516
517    #[test]
518    fn test_wal_truncate() {
519        let temp_dir = TempDir::new().unwrap();
520        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
521
522        // Write events
523        for _ in 0..5 {
524            wal.append(create_test_event()).unwrap();
525        }
526
527        // Truncate
528        wal.truncate().unwrap();
529
530        // Verify sequence is reset
531        assert_eq!(wal.current_sequence(), 0);
532
533        // Verify recovery returns empty
534        let recovered = wal.recover().unwrap();
535        assert_eq!(recovered.len(), 0);
536    }
537}