Skip to main content

allsource_core/infrastructure/persistence/
wal.rs

1use crate::{
2    domain::entities::Event,
3    error::{AllSourceError, Result},
4};
5use chrono::{DateTime, Utc};
6use parking_lot::RwLock;
7use serde::{Deserialize, Serialize};
8use std::{
9    fs::{self, File, OpenOptions},
10    io::{BufRead, BufReader, BufWriter, Write},
11    path::{Path, PathBuf},
12    sync::Arc,
13};
14
15/// Write-Ahead Log for durability and crash recovery
16pub struct WriteAheadLog {
17    /// Directory where WAL files are stored
18    wal_dir: PathBuf,
19
20    /// Current active WAL file
21    current_file: Arc<RwLock<WALFile>>,
22
23    /// Configuration
24    config: WALConfig,
25
26    /// Statistics
27    stats: Arc<RwLock<WALStats>>,
28
29    /// Current sequence number
30    sequence: Arc<RwLock<u64>>,
31
32    /// Optional broadcast channel for replication โ€” when set, each WAL append
33    /// publishes the entry so the WAL shipper can stream it to followers.
34    /// Wrapped in Mutex so it can be set at runtime (e.g. during follower โ†’ leader promotion).
35    replication_tx: parking_lot::Mutex<Option<tokio::sync::broadcast::Sender<WALEntry>>>,
36}
37
38#[derive(Debug, Clone)]
39pub struct WALConfig {
40    /// Maximum size of a single WAL file before rotation (in bytes)
41    pub max_file_size: usize,
42
43    /// Whether to sync to disk after each write (fsync)
44    pub sync_on_write: bool,
45
46    /// Maximum number of WAL files to keep
47    pub max_wal_files: usize,
48
49    /// Enable WAL compression
50    pub compress: bool,
51
52    /// Interval in milliseconds for background coalesced fsync.
53    /// When set, a background task calls `flush() + sync_all()` every N ms,
54    /// giving near-zero write latency with bounded data loss window.
55    /// When `Some`, `sync_on_write` is forced to `false` to prevent double-fsync.
56    pub fsync_interval_ms: Option<u64>,
57}
58
59impl Default for WALConfig {
60    fn default() -> Self {
61        Self {
62            max_file_size: 64 * 1024 * 1024, // 64 MB
63            sync_on_write: true,
64            max_wal_files: 10,
65            compress: false,
66            fsync_interval_ms: None,
67        }
68    }
69}
70
71#[derive(Debug, Clone, Default, Serialize)]
72pub struct WALStats {
73    pub total_entries: u64,
74    pub total_bytes_written: u64,
75    pub current_file_size: usize,
76    pub files_rotated: u64,
77    pub files_cleaned: u64,
78    pub recovery_count: u64,
79}
80
81/// WAL entry wrapping an event
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct WALEntry {
84    /// Sequence number for ordering
85    pub sequence: u64,
86
87    /// Timestamp when written to WAL
88    pub wal_timestamp: DateTime<Utc>,
89
90    /// The event being logged
91    pub event: Event,
92
93    /// Checksum for integrity verification
94    pub checksum: u32,
95}
96
97impl WALEntry {
98    pub fn new(sequence: u64, event: Event) -> Self {
99        let mut entry = Self {
100            sequence,
101            wal_timestamp: Utc::now(),
102            event,
103            checksum: 0,
104        };
105        entry.checksum = entry.calculate_checksum();
106        entry
107    }
108
109    fn calculate_checksum(&self) -> u32 {
110        // Simple CRC32 checksum
111        let data = format!("{}{}{}", self.sequence, self.wal_timestamp, self.event.id);
112        crc32fast::hash(data.as_bytes())
113    }
114
115    pub fn verify(&self) -> bool {
116        self.checksum == self.calculate_checksum()
117    }
118}
119
120/// Represents an active WAL file
121struct WALFile {
122    path: PathBuf,
123    writer: BufWriter<File>,
124    size: usize,
125    created_at: DateTime<Utc>,
126}
127
128impl WALFile {
129    fn new(path: PathBuf) -> Result<Self> {
130        let file = OpenOptions::new()
131            .create(true)
132            .append(true)
133            .open(&path)
134            .map_err(|e| AllSourceError::StorageError(format!("Failed to open WAL file: {e}")))?;
135
136        let size = file.metadata().map(|m| m.len() as usize).unwrap_or(0);
137
138        Ok(Self {
139            path,
140            writer: BufWriter::new(file),
141            size,
142            created_at: Utc::now(),
143        })
144    }
145
146    fn write_entry(&mut self, entry: &WALEntry, sync: bool) -> Result<usize> {
147        // Serialize entry as JSON line
148        let json = serde_json::to_string(entry)?;
149
150        let line = format!("{json}\n");
151        let bytes_written = line.len();
152
153        self.writer
154            .write_all(line.as_bytes())
155            .map_err(|e| AllSourceError::StorageError(format!("Failed to write to WAL: {e}")))?;
156
157        if sync {
158            self.writer
159                .flush()
160                .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {e}")))?;
161
162            self.writer
163                .get_ref()
164                .sync_all()
165                .map_err(|e| AllSourceError::StorageError(format!("Failed to sync WAL: {e}")))?;
166        }
167
168        self.size += bytes_written;
169        Ok(bytes_written)
170    }
171
172    fn flush(&mut self) -> Result<()> {
173        self.writer
174            .flush()
175            .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {e}")))?;
176        Ok(())
177    }
178}
179
180impl WriteAheadLog {
181    /// Create a new WAL
182    pub fn new(wal_dir: impl Into<PathBuf>, config: WALConfig) -> Result<Self> {
183        let wal_dir = wal_dir.into();
184
185        // Create WAL directory if it doesn't exist
186        fs::create_dir_all(&wal_dir).map_err(|e| {
187            AllSourceError::StorageError(format!("Failed to create WAL directory: {e}"))
188        })?;
189
190        // Create initial WAL file
191        let initial_file_path = Self::generate_wal_filename(&wal_dir, 0);
192        let current_file = WALFile::new(initial_file_path)?;
193
194        tracing::info!("โœ… WAL initialized at: {}", wal_dir.display());
195
196        Ok(Self {
197            wal_dir,
198            current_file: Arc::new(RwLock::new(current_file)),
199            config,
200            stats: Arc::new(RwLock::new(WALStats::default())),
201            sequence: Arc::new(RwLock::new(0)),
202            replication_tx: parking_lot::Mutex::new(None),
203        })
204    }
205
206    /// Generate a WAL filename based on sequence
207    fn generate_wal_filename(dir: &Path, sequence: u64) -> PathBuf {
208        dir.join(format!("wal-{sequence:016x}.log"))
209    }
210
211    /// Write an event to the WAL
212    #[cfg_attr(feature = "hotpath", hotpath::measure)]
213    pub fn append(&self, event: Event) -> Result<u64> {
214        // Get next sequence number
215        let mut seq = self.sequence.write();
216        *seq += 1;
217        let sequence = *seq;
218        drop(seq);
219
220        // Create WAL entry
221        let entry = WALEntry::new(sequence, event);
222
223        // Write to current file
224        let mut current = self.current_file.write();
225        let bytes_written = current.write_entry(&entry, self.config.sync_on_write)?;
226
227        // Update statistics
228        let mut stats = self.stats.write();
229        stats.total_entries += 1;
230        stats.total_bytes_written += bytes_written as u64;
231        stats.current_file_size = current.size;
232        drop(stats);
233
234        // Broadcast to replication channel (if enabled).
235        // Errors are ignored: no receivers simply means no followers are connected.
236        if let Some(ref tx) = *self.replication_tx.lock() {
237            let _ = tx.send(entry);
238        }
239
240        // Check if we need to rotate
241        let should_rotate = current.size >= self.config.max_file_size;
242        drop(current);
243
244        if should_rotate {
245            self.rotate()?;
246        }
247
248        tracing::trace!("WAL entry written: sequence={}", sequence);
249
250        Ok(sequence)
251    }
252
253    /// Rotate to a new WAL file
254    #[cfg_attr(feature = "hotpath", hotpath::measure)]
255    fn rotate(&self) -> Result<()> {
256        let seq = *self.sequence.read();
257        let new_file_path = Self::generate_wal_filename(&self.wal_dir, seq);
258
259        tracing::info!("๐Ÿ”„ Rotating WAL to new file: {:?}", new_file_path);
260
261        let new_file = WALFile::new(new_file_path)?;
262
263        let mut current = self.current_file.write();
264        current.flush()?;
265        *current = new_file;
266
267        let mut stats = self.stats.write();
268        stats.files_rotated += 1;
269        stats.current_file_size = 0;
270        drop(stats);
271
272        // Clean up old WAL files
273        self.cleanup_old_files()?;
274
275        Ok(())
276    }
277
278    /// Clean up old WAL files beyond the retention limit
279    #[cfg_attr(feature = "hotpath", hotpath::measure)]
280    fn cleanup_old_files(&self) -> Result<()> {
281        let mut wal_files = self.list_wal_files()?;
282        wal_files.sort();
283
284        if wal_files.len() > self.config.max_wal_files {
285            let to_remove = wal_files.len() - self.config.max_wal_files;
286            let files_to_delete = &wal_files[..to_remove];
287
288            for file_path in files_to_delete {
289                if let Err(e) = fs::remove_file(file_path) {
290                    tracing::warn!("Failed to remove old WAL file {:?}: {}", file_path, e);
291                } else {
292                    tracing::debug!("๐Ÿ—‘๏ธ Removed old WAL file: {:?}", file_path);
293                    let mut stats = self.stats.write();
294                    stats.files_cleaned += 1;
295                }
296            }
297        }
298
299        Ok(())
300    }
301
302    /// List all WAL files in the directory
303    fn list_wal_files(&self) -> Result<Vec<PathBuf>> {
304        let entries = fs::read_dir(&self.wal_dir).map_err(|e| {
305            AllSourceError::StorageError(format!("Failed to read WAL directory: {e}"))
306        })?;
307
308        let mut wal_files = Vec::new();
309        for entry in entries {
310            let entry = entry.map_err(|e| {
311                AllSourceError::StorageError(format!("Failed to read directory entry: {e}"))
312            })?;
313
314            let path = entry.path();
315            if let Some(name) = path.file_name()
316                && name.to_string_lossy().starts_with("wal-")
317                && name.to_string_lossy().ends_with(".log")
318            {
319                wal_files.push(path);
320            }
321        }
322
323        Ok(wal_files)
324    }
325
326    /// Recover events from WAL files
327    #[cfg_attr(feature = "hotpath", hotpath::measure)]
328    pub fn recover(&self) -> Result<Vec<Event>> {
329        tracing::info!("๐Ÿ”„ Starting WAL recovery...");
330
331        let mut wal_files = self.list_wal_files()?;
332        wal_files.sort();
333
334        let mut recovered_events = Vec::new();
335        let mut max_sequence = 0u64;
336        let mut corrupted_entries = 0;
337
338        for wal_file_path in &wal_files {
339            tracing::debug!("Reading WAL file: {:?}", wal_file_path);
340
341            let file = File::open(wal_file_path).map_err(|e| {
342                AllSourceError::StorageError(format!("Failed to open WAL file for recovery: {e}"))
343            })?;
344
345            let reader = BufReader::new(file);
346
347            for (line_num, line) in reader.lines().enumerate() {
348                let line = match line {
349                    Ok(l) => l,
350                    Err(e) => {
351                        tracing::warn!(
352                            "I/O error reading WAL line at {:?}:{}: {}",
353                            wal_file_path,
354                            line_num + 1,
355                            e
356                        );
357                        corrupted_entries += 1;
358                        continue;
359                    }
360                };
361
362                if line.trim().is_empty() {
363                    continue;
364                }
365
366                match serde_json::from_str::<WALEntry>(&line) {
367                    Ok(entry) => {
368                        // Verify checksum
369                        if !entry.verify() {
370                            tracing::warn!(
371                                "Corrupted WAL entry at {:?}:{} (checksum mismatch)",
372                                wal_file_path,
373                                line_num + 1
374                            );
375                            corrupted_entries += 1;
376                            continue;
377                        }
378
379                        max_sequence = max_sequence.max(entry.sequence);
380                        recovered_events.push(entry.event);
381                    }
382                    Err(e) => {
383                        tracing::warn!(
384                            "Failed to parse WAL entry at {:?}:{}: {}",
385                            wal_file_path,
386                            line_num + 1,
387                            e
388                        );
389                        corrupted_entries += 1;
390                    }
391                }
392            }
393        }
394
395        // Update sequence counter
396        let mut seq = self.sequence.write();
397        *seq = max_sequence;
398        drop(seq);
399
400        // Update stats
401        let mut stats = self.stats.write();
402        stats.recovery_count += 1;
403        drop(stats);
404
405        tracing::info!(
406            "โœ… WAL recovery complete: {} events recovered, {} corrupted entries",
407            recovered_events.len(),
408            corrupted_entries
409        );
410
411        Ok(recovered_events)
412    }
413
414    /// Manually flush the current WAL file
415    #[cfg_attr(feature = "hotpath", hotpath::measure)]
416    pub fn flush(&self) -> Result<()> {
417        let mut current = self.current_file.write();
418        current.flush()?;
419        Ok(())
420    }
421
422    /// Flush the BufWriter and fsync the current WAL file to disk.
423    ///
424    /// Called by the background interval-based fsync task. Acquires the write
425    /// lock, flushes buffered data, then issues `sync_all()` to ensure the
426    /// OS has persisted the data to durable storage.
427    #[cfg_attr(feature = "hotpath", hotpath::measure)]
428    pub fn sync(&self) -> Result<()> {
429        let mut current = self.current_file.write();
430        current
431            .writer
432            .flush()
433            .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {e}")))?;
434        current
435            .writer
436            .get_ref()
437            .sync_all()
438            .map_err(|e| AllSourceError::StorageError(format!("Failed to sync WAL: {e}")))?;
439        Ok(())
440    }
441
442    /// Get the configured fsync interval (if any).
443    pub fn fsync_interval_ms(&self) -> Option<u64> {
444        self.config.fsync_interval_ms
445    }
446
447    /// Truncate WAL after successful checkpoint
448    #[cfg_attr(feature = "hotpath", hotpath::measure)]
449    pub fn truncate(&self) -> Result<()> {
450        tracing::info!("๐Ÿงน Truncating WAL after checkpoint");
451
452        // Close current file
453        let mut current = self.current_file.write();
454        current.flush()?;
455
456        // Remove all WAL files
457        let wal_files = self.list_wal_files()?;
458        for file_path in wal_files {
459            fs::remove_file(&file_path).map_err(|e| {
460                AllSourceError::StorageError(format!("Failed to remove WAL file: {e}"))
461            })?;
462            tracing::debug!("Removed WAL file: {:?}", file_path);
463        }
464
465        // Create new WAL file
466        let new_file_path = Self::generate_wal_filename(&self.wal_dir, 0);
467        *current = WALFile::new(new_file_path)?;
468
469        // Reset sequence
470        let mut seq = self.sequence.write();
471        *seq = 0;
472
473        tracing::info!("โœ… WAL truncated successfully");
474
475        Ok(())
476    }
477
478    /// Get WAL statistics
479    pub fn stats(&self) -> WALStats {
480        (*self.stats.read()).clone()
481    }
482
483    /// Get current sequence number
484    pub fn current_sequence(&self) -> u64 {
485        *self.sequence.read()
486    }
487
488    /// Get the oldest available WAL sequence number.
489    ///
490    /// Returns the first sequence found in the oldest WAL file, or `None`
491    /// if no WAL entries exist. Used by the replication catch-up protocol
492    /// to determine whether a follower can catch up from WAL alone.
493    pub fn oldest_sequence(&self) -> Option<u64> {
494        let Ok(mut wal_files) = self.list_wal_files() else {
495            return None;
496        };
497
498        if wal_files.is_empty() {
499            return None;
500        }
501
502        wal_files.sort();
503
504        // Read the first entry from the oldest WAL file
505        for wal_file_path in &wal_files {
506            let Ok(file) = File::open(wal_file_path) else {
507                continue;
508            };
509            let reader = BufReader::new(file);
510            for line in reader.lines() {
511                let Ok(line) = line else {
512                    continue;
513                };
514                if line.trim().is_empty() {
515                    continue;
516                }
517                if let Ok(entry) = serde_json::from_str::<WALEntry>(&line) {
518                    return Some(entry.sequence);
519                }
520            }
521        }
522
523        None
524    }
525
526    /// Attach a broadcast sender for WAL replication.
527    ///
528    /// When set, every `append()` call will publish the WAL entry to this
529    /// channel so the WAL shipper can stream it to connected followers.
530    pub fn set_replication_tx(&self, tx: tokio::sync::broadcast::Sender<WALEntry>) {
531        *self.replication_tx.lock() = Some(tx);
532    }
533}
534
535#[cfg(test)]
536mod tests {
537    use super::*;
538    use serde_json::json;
539    use tempfile::TempDir;
540    use uuid::Uuid;
541
542    fn create_test_event() -> Event {
543        Event::reconstruct_from_strings(
544            Uuid::new_v4(),
545            "test.event".to_string(),
546            "test-entity".to_string(),
547            "default".to_string(),
548            json!({"test": "data"}),
549            Utc::now(),
550            None,
551            1,
552        )
553    }
554
555    #[test]
556    fn test_wal_creation() {
557        let temp_dir = TempDir::new().unwrap();
558        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default());
559        assert!(wal.is_ok());
560    }
561
562    #[test]
563    fn test_wal_append() {
564        let temp_dir = TempDir::new().unwrap();
565        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
566
567        let event = create_test_event();
568        let seq = wal.append(event);
569        assert!(seq.is_ok());
570        assert_eq!(seq.unwrap(), 1);
571
572        let stats = wal.stats();
573        assert_eq!(stats.total_entries, 1);
574    }
575
576    #[test]
577    fn test_wal_recovery() {
578        let temp_dir = TempDir::new().unwrap();
579        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
580
581        // Write some events
582        for _ in 0..5 {
583            wal.append(create_test_event()).unwrap();
584        }
585
586        wal.flush().unwrap();
587
588        // Create new WAL instance (simulating restart)
589        let wal2 = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
590        let recovered = wal2.recover().unwrap();
591
592        assert_eq!(recovered.len(), 5);
593    }
594
595    #[test]
596    fn test_wal_recovery_with_partial_write() {
597        let temp_dir = TempDir::new().unwrap();
598        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
599
600        // Write 3 complete events
601        for _ in 0..3 {
602            wal.append(create_test_event()).unwrap();
603        }
604        wal.flush().unwrap();
605
606        // Simulate a partial write by appending malformed data to the WAL file
607        let wal_file_path = temp_dir.path().join("wal-0000000000000000.log");
608        use std::io::Write as _;
609        let mut f = std::fs::OpenOptions::new()
610            .append(true)
611            .open(&wal_file_path)
612            .unwrap();
613        f.write_all(b"{\"partial\": true, \"seq\"\n").unwrap(); // truncated JSON
614        drop(f);
615
616        // Recovery should succeed and return only the 3 valid events
617        let wal2 = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
618        let recovered = wal2.recover().unwrap();
619        assert_eq!(
620            recovered.len(),
621            3,
622            "Should recover only the 3 valid events, not the partial one"
623        );
624    }
625
626    #[test]
627    fn test_wal_rotation() {
628        let temp_dir = TempDir::new().unwrap();
629        let config = WALConfig {
630            max_file_size: 1024, // Small size to trigger rotation
631            ..Default::default()
632        };
633
634        let wal = WriteAheadLog::new(temp_dir.path(), config).unwrap();
635
636        // Write enough events to trigger rotation
637        for _ in 0..50 {
638            wal.append(create_test_event()).unwrap();
639        }
640
641        let stats = wal.stats();
642        assert!(stats.files_rotated > 0);
643    }
644
645    #[test]
646    fn test_wal_entry_checksum() {
647        let event = create_test_event();
648        let entry = WALEntry::new(1, event);
649
650        assert!(entry.verify());
651
652        // Modify and verify it fails
653        let mut corrupted = entry.clone();
654        corrupted.checksum = 0;
655        assert!(!corrupted.verify());
656    }
657
658    #[test]
659    fn test_wal_fsync_interval_config() {
660        let config = WALConfig {
661            fsync_interval_ms: Some(100),
662            ..Default::default()
663        };
664        assert_eq!(config.fsync_interval_ms, Some(100));
665        // Default should have no interval
666        assert_eq!(WALConfig::default().fsync_interval_ms, None);
667    }
668
669    #[test]
670    fn test_wal_sync_method() {
671        let temp_dir = TempDir::new().unwrap();
672        let config = WALConfig {
673            sync_on_write: false, // Disable per-write sync to test explicit sync
674            ..Default::default()
675        };
676        let wal = WriteAheadLog::new(temp_dir.path(), config).unwrap();
677
678        // Write events without per-write fsync
679        for _ in 0..5 {
680            wal.append(create_test_event()).unwrap();
681        }
682
683        // Explicitly sync โ€” should flush + fsync without error
684        wal.sync().unwrap();
685
686        // Verify data survives by recovering from a new WAL instance
687        let wal2 = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
688        let recovered = wal2.recover().unwrap();
689        assert_eq!(recovered.len(), 5);
690    }
691
692    #[test]
693    fn test_wal_truncate() {
694        let temp_dir = TempDir::new().unwrap();
695        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
696
697        // Write events
698        for _ in 0..5 {
699            wal.append(create_test_event()).unwrap();
700        }
701
702        // Truncate
703        wal.truncate().unwrap();
704
705        // Verify sequence is reset
706        assert_eq!(wal.current_sequence(), 0);
707
708        // Verify recovery returns empty
709        let recovered = wal.recover().unwrap();
710        assert_eq!(recovered.len(), 0);
711    }
712}