Skip to main content

allsource_core/infrastructure/persistence/
wal.rs

1use crate::{
2    domain::entities::Event,
3    error::{AllSourceError, Result},
4};
5use chrono::{DateTime, Utc};
6use parking_lot::RwLock;
7use serde::{Deserialize, Serialize};
8use std::{
9    fs::{self, File, OpenOptions},
10    io::{BufRead, BufReader, BufWriter, Write},
11    path::{Path, PathBuf},
12    sync::Arc,
13};
14
15/// Write-Ahead Log for durability and crash recovery
16pub struct WriteAheadLog {
17    /// Directory where WAL files are stored
18    wal_dir: PathBuf,
19
20    /// Current active WAL file
21    current_file: Arc<RwLock<WALFile>>,
22
23    /// Configuration
24    config: WALConfig,
25
26    /// Statistics
27    stats: Arc<RwLock<WALStats>>,
28
29    /// Current sequence number
30    sequence: Arc<RwLock<u64>>,
31
32    /// Optional broadcast channel for replication โ€” when set, each WAL append
33    /// publishes the entry so the WAL shipper can stream it to followers.
34    /// Wrapped in Mutex so it can be set at runtime (e.g. during follower โ†’ leader promotion).
35    replication_tx: parking_lot::Mutex<Option<tokio::sync::broadcast::Sender<WALEntry>>>,
36}
37
38#[derive(Debug, Clone)]
39pub struct WALConfig {
40    /// Maximum size of a single WAL file before rotation (in bytes)
41    pub max_file_size: usize,
42
43    /// Whether to sync to disk after each write (fsync)
44    pub sync_on_write: bool,
45
46    /// Maximum number of WAL files to keep
47    pub max_wal_files: usize,
48
49    /// Enable WAL compression
50    pub compress: bool,
51}
52
53impl Default for WALConfig {
54    fn default() -> Self {
55        Self {
56            max_file_size: 64 * 1024 * 1024, // 64 MB
57            sync_on_write: true,
58            max_wal_files: 10,
59            compress: false,
60        }
61    }
62}
63
64#[derive(Debug, Clone, Default, Serialize)]
65pub struct WALStats {
66    pub total_entries: u64,
67    pub total_bytes_written: u64,
68    pub current_file_size: usize,
69    pub files_rotated: u64,
70    pub files_cleaned: u64,
71    pub recovery_count: u64,
72}
73
74/// WAL entry wrapping an event
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct WALEntry {
77    /// Sequence number for ordering
78    pub sequence: u64,
79
80    /// Timestamp when written to WAL
81    pub wal_timestamp: DateTime<Utc>,
82
83    /// The event being logged
84    pub event: Event,
85
86    /// Checksum for integrity verification
87    pub checksum: u32,
88}
89
90impl WALEntry {
91    pub fn new(sequence: u64, event: Event) -> Self {
92        let mut entry = Self {
93            sequence,
94            wal_timestamp: Utc::now(),
95            event,
96            checksum: 0,
97        };
98        entry.checksum = entry.calculate_checksum();
99        entry
100    }
101
102    fn calculate_checksum(&self) -> u32 {
103        // Simple CRC32 checksum
104        let data = format!("{}{}{}", self.sequence, self.wal_timestamp, self.event.id);
105        crc32fast::hash(data.as_bytes())
106    }
107
108    pub fn verify(&self) -> bool {
109        self.checksum == self.calculate_checksum()
110    }
111}
112
113/// Represents an active WAL file
114struct WALFile {
115    path: PathBuf,
116    writer: BufWriter<File>,
117    size: usize,
118    created_at: DateTime<Utc>,
119}
120
121impl WALFile {
122    fn new(path: PathBuf) -> Result<Self> {
123        let file = OpenOptions::new()
124            .create(true)
125            .append(true)
126            .open(&path)
127            .map_err(|e| AllSourceError::StorageError(format!("Failed to open WAL file: {e}")))?;
128
129        let size = file.metadata().map(|m| m.len() as usize).unwrap_or(0);
130
131        Ok(Self {
132            path,
133            writer: BufWriter::new(file),
134            size,
135            created_at: Utc::now(),
136        })
137    }
138
139    fn write_entry(&mut self, entry: &WALEntry, sync: bool) -> Result<usize> {
140        // Serialize entry as JSON line
141        let json = serde_json::to_string(entry)?;
142
143        let line = format!("{}\n", json);
144        let bytes_written = line.len();
145
146        self.writer
147            .write_all(line.as_bytes())
148            .map_err(|e| AllSourceError::StorageError(format!("Failed to write to WAL: {e}")))?;
149
150        if sync {
151            self.writer
152                .flush()
153                .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {e}")))?;
154
155            self.writer
156                .get_ref()
157                .sync_all()
158                .map_err(|e| AllSourceError::StorageError(format!("Failed to sync WAL: {e}")))?;
159        }
160
161        self.size += bytes_written;
162        Ok(bytes_written)
163    }
164
165    fn flush(&mut self) -> Result<()> {
166        self.writer
167            .flush()
168            .map_err(|e| AllSourceError::StorageError(format!("Failed to flush WAL: {}", e)))?;
169        Ok(())
170    }
171}
172
173impl WriteAheadLog {
174    /// Create a new WAL
175    pub fn new(wal_dir: impl Into<PathBuf>, config: WALConfig) -> Result<Self> {
176        let wal_dir = wal_dir.into();
177
178        // Create WAL directory if it doesn't exist
179        fs::create_dir_all(&wal_dir).map_err(|e| {
180            AllSourceError::StorageError(format!("Failed to create WAL directory: {e}"))
181        })?;
182
183        // Create initial WAL file
184        let initial_file_path = Self::generate_wal_filename(&wal_dir, 0);
185        let current_file = WALFile::new(initial_file_path)?;
186
187        tracing::info!("โœ… WAL initialized at: {}", wal_dir.display());
188
189        Ok(Self {
190            wal_dir,
191            current_file: Arc::new(RwLock::new(current_file)),
192            config,
193            stats: Arc::new(RwLock::new(WALStats::default())),
194            sequence: Arc::new(RwLock::new(0)),
195            replication_tx: parking_lot::Mutex::new(None),
196        })
197    }
198
199    /// Generate a WAL filename based on sequence
200    fn generate_wal_filename(dir: &Path, sequence: u64) -> PathBuf {
201        dir.join(format!("wal-{:016x}.log", sequence))
202    }
203
204    /// Write an event to the WAL
205    pub fn append(&self, event: Event) -> Result<u64> {
206        // Get next sequence number
207        let mut seq = self.sequence.write();
208        *seq += 1;
209        let sequence = *seq;
210        drop(seq);
211
212        // Create WAL entry
213        let entry = WALEntry::new(sequence, event);
214
215        // Write to current file
216        let mut current = self.current_file.write();
217        let bytes_written = current.write_entry(&entry, self.config.sync_on_write)?;
218
219        // Update statistics
220        let mut stats = self.stats.write();
221        stats.total_entries += 1;
222        stats.total_bytes_written += bytes_written as u64;
223        stats.current_file_size = current.size;
224        drop(stats);
225
226        // Broadcast to replication channel (if enabled).
227        // Errors are ignored: no receivers simply means no followers are connected.
228        if let Some(ref tx) = *self.replication_tx.lock() {
229            let _ = tx.send(entry);
230        }
231
232        // Check if we need to rotate
233        let should_rotate = current.size >= self.config.max_file_size;
234        drop(current);
235
236        if should_rotate {
237            self.rotate()?;
238        }
239
240        tracing::trace!("WAL entry written: sequence={}", sequence);
241
242        Ok(sequence)
243    }
244
245    /// Rotate to a new WAL file
246    fn rotate(&self) -> Result<()> {
247        let seq = *self.sequence.read();
248        let new_file_path = Self::generate_wal_filename(&self.wal_dir, seq);
249
250        tracing::info!("๐Ÿ”„ Rotating WAL to new file: {:?}", new_file_path);
251
252        let new_file = WALFile::new(new_file_path)?;
253
254        let mut current = self.current_file.write();
255        current.flush()?;
256        *current = new_file;
257
258        let mut stats = self.stats.write();
259        stats.files_rotated += 1;
260        stats.current_file_size = 0;
261        drop(stats);
262
263        // Clean up old WAL files
264        self.cleanup_old_files()?;
265
266        Ok(())
267    }
268
269    /// Clean up old WAL files beyond the retention limit
270    fn cleanup_old_files(&self) -> Result<()> {
271        let mut wal_files = self.list_wal_files()?;
272        wal_files.sort();
273
274        if wal_files.len() > self.config.max_wal_files {
275            let to_remove = wal_files.len() - self.config.max_wal_files;
276            let files_to_delete = &wal_files[..to_remove];
277
278            for file_path in files_to_delete {
279                if let Err(e) = fs::remove_file(file_path) {
280                    tracing::warn!("Failed to remove old WAL file {:?}: {}", file_path, e);
281                } else {
282                    tracing::debug!("๐Ÿ—‘๏ธ Removed old WAL file: {:?}", file_path);
283                    let mut stats = self.stats.write();
284                    stats.files_cleaned += 1;
285                }
286            }
287        }
288
289        Ok(())
290    }
291
292    /// List all WAL files in the directory
293    fn list_wal_files(&self) -> Result<Vec<PathBuf>> {
294        let entries = fs::read_dir(&self.wal_dir).map_err(|e| {
295            AllSourceError::StorageError(format!("Failed to read WAL directory: {e}"))
296        })?;
297
298        let mut wal_files = Vec::new();
299        for entry in entries {
300            let entry = entry.map_err(|e| {
301                AllSourceError::StorageError(format!("Failed to read directory entry: {e}"))
302            })?;
303
304            let path = entry.path();
305            if let Some(name) = path.file_name()
306                && name.to_string_lossy().starts_with("wal-")
307                && name.to_string_lossy().ends_with(".log")
308            {
309                wal_files.push(path);
310            }
311        }
312
313        Ok(wal_files)
314    }
315
316    /// Recover events from WAL files
317    pub fn recover(&self) -> Result<Vec<Event>> {
318        tracing::info!("๐Ÿ”„ Starting WAL recovery...");
319
320        let mut wal_files = self.list_wal_files()?;
321        wal_files.sort();
322
323        let mut recovered_events = Vec::new();
324        let mut max_sequence = 0u64;
325        let mut corrupted_entries = 0;
326
327        for wal_file_path in &wal_files {
328            tracing::debug!("Reading WAL file: {:?}", wal_file_path);
329
330            let file = File::open(wal_file_path).map_err(|e| {
331                AllSourceError::StorageError(format!("Failed to open WAL file for recovery: {e}"))
332            })?;
333
334            let reader = BufReader::new(file);
335
336            for (line_num, line) in reader.lines().enumerate() {
337                let line = line.map_err(|e| {
338                    AllSourceError::StorageError(format!("Failed to read WAL line: {e}"))
339                })?;
340
341                if line.trim().is_empty() {
342                    continue;
343                }
344
345                match serde_json::from_str::<WALEntry>(&line) {
346                    Ok(entry) => {
347                        // Verify checksum
348                        if !entry.verify() {
349                            tracing::warn!(
350                                "Corrupted WAL entry at {:?}:{} (checksum mismatch)",
351                                wal_file_path,
352                                line_num + 1
353                            );
354                            corrupted_entries += 1;
355                            continue;
356                        }
357
358                        max_sequence = max_sequence.max(entry.sequence);
359                        recovered_events.push(entry.event);
360                    }
361                    Err(e) => {
362                        tracing::warn!(
363                            "Failed to parse WAL entry at {:?}:{}: {}",
364                            wal_file_path,
365                            line_num + 1,
366                            e
367                        );
368                        corrupted_entries += 1;
369                    }
370                }
371            }
372        }
373
374        // Update sequence counter
375        let mut seq = self.sequence.write();
376        *seq = max_sequence;
377        drop(seq);
378
379        // Update stats
380        let mut stats = self.stats.write();
381        stats.recovery_count += 1;
382        drop(stats);
383
384        tracing::info!(
385            "โœ… WAL recovery complete: {} events recovered, {} corrupted entries",
386            recovered_events.len(),
387            corrupted_entries
388        );
389
390        Ok(recovered_events)
391    }
392
393    /// Manually flush the current WAL file
394    pub fn flush(&self) -> Result<()> {
395        let mut current = self.current_file.write();
396        current.flush()?;
397        Ok(())
398    }
399
400    /// Truncate WAL after successful checkpoint
401    pub fn truncate(&self) -> Result<()> {
402        tracing::info!("๐Ÿงน Truncating WAL after checkpoint");
403
404        // Close current file
405        let mut current = self.current_file.write();
406        current.flush()?;
407
408        // Remove all WAL files
409        let wal_files = self.list_wal_files()?;
410        for file_path in wal_files {
411            fs::remove_file(&file_path).map_err(|e| {
412                AllSourceError::StorageError(format!("Failed to remove WAL file: {e}"))
413            })?;
414            tracing::debug!("Removed WAL file: {:?}", file_path);
415        }
416
417        // Create new WAL file
418        let new_file_path = Self::generate_wal_filename(&self.wal_dir, 0);
419        *current = WALFile::new(new_file_path)?;
420
421        // Reset sequence
422        let mut seq = self.sequence.write();
423        *seq = 0;
424
425        tracing::info!("โœ… WAL truncated successfully");
426
427        Ok(())
428    }
429
430    /// Get WAL statistics
431    pub fn stats(&self) -> WALStats {
432        (*self.stats.read()).clone()
433    }
434
435    /// Get current sequence number
436    pub fn current_sequence(&self) -> u64 {
437        *self.sequence.read()
438    }
439
440    /// Get the oldest available WAL sequence number.
441    ///
442    /// Returns the first sequence found in the oldest WAL file, or `None`
443    /// if no WAL entries exist. Used by the replication catch-up protocol
444    /// to determine whether a follower can catch up from WAL alone.
445    pub fn oldest_sequence(&self) -> Option<u64> {
446        let mut wal_files = match self.list_wal_files() {
447            Ok(files) => files,
448            Err(_) => return None,
449        };
450
451        if wal_files.is_empty() {
452            return None;
453        }
454
455        wal_files.sort();
456
457        // Read the first entry from the oldest WAL file
458        for wal_file_path in &wal_files {
459            let file = match File::open(wal_file_path) {
460                Ok(f) => f,
461                Err(_) => continue,
462            };
463            let reader = BufReader::new(file);
464            for line in reader.lines() {
465                let line = match line {
466                    Ok(l) => l,
467                    Err(_) => continue,
468                };
469                if line.trim().is_empty() {
470                    continue;
471                }
472                if let Ok(entry) = serde_json::from_str::<WALEntry>(&line) {
473                    return Some(entry.sequence);
474                }
475            }
476        }
477
478        None
479    }
480
481    /// Attach a broadcast sender for WAL replication.
482    ///
483    /// When set, every `append()` call will publish the WAL entry to this
484    /// channel so the WAL shipper can stream it to connected followers.
485    pub fn set_replication_tx(&self, tx: tokio::sync::broadcast::Sender<WALEntry>) {
486        *self.replication_tx.lock() = Some(tx);
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use serde_json::json;
494    use tempfile::TempDir;
495    use uuid::Uuid;
496
497    fn create_test_event() -> Event {
498        Event::reconstruct_from_strings(
499            Uuid::new_v4(),
500            "test.event".to_string(),
501            "test-entity".to_string(),
502            "default".to_string(),
503            json!({"test": "data"}),
504            Utc::now(),
505            None,
506            1,
507        )
508    }
509
510    #[test]
511    fn test_wal_creation() {
512        let temp_dir = TempDir::new().unwrap();
513        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default());
514        assert!(wal.is_ok());
515    }
516
517    #[test]
518    fn test_wal_append() {
519        let temp_dir = TempDir::new().unwrap();
520        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
521
522        let event = create_test_event();
523        let seq = wal.append(event);
524        assert!(seq.is_ok());
525        assert_eq!(seq.unwrap(), 1);
526
527        let stats = wal.stats();
528        assert_eq!(stats.total_entries, 1);
529    }
530
531    #[test]
532    fn test_wal_recovery() {
533        let temp_dir = TempDir::new().unwrap();
534        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
535
536        // Write some events
537        for _ in 0..5 {
538            wal.append(create_test_event()).unwrap();
539        }
540
541        wal.flush().unwrap();
542
543        // Create new WAL instance (simulating restart)
544        let wal2 = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
545        let recovered = wal2.recover().unwrap();
546
547        assert_eq!(recovered.len(), 5);
548    }
549
550    #[test]
551    fn test_wal_rotation() {
552        let temp_dir = TempDir::new().unwrap();
553        let config = WALConfig {
554            max_file_size: 1024, // Small size to trigger rotation
555            ..Default::default()
556        };
557
558        let wal = WriteAheadLog::new(temp_dir.path(), config).unwrap();
559
560        // Write enough events to trigger rotation
561        for _ in 0..50 {
562            wal.append(create_test_event()).unwrap();
563        }
564
565        let stats = wal.stats();
566        assert!(stats.files_rotated > 0);
567    }
568
569    #[test]
570    fn test_wal_entry_checksum() {
571        let event = create_test_event();
572        let entry = WALEntry::new(1, event);
573
574        assert!(entry.verify());
575
576        // Modify and verify it fails
577        let mut corrupted = entry.clone();
578        corrupted.checksum = 0;
579        assert!(!corrupted.verify());
580    }
581
582    #[test]
583    fn test_wal_truncate() {
584        let temp_dir = TempDir::new().unwrap();
585        let wal = WriteAheadLog::new(temp_dir.path(), WALConfig::default()).unwrap();
586
587        // Write events
588        for _ in 0..5 {
589            wal.append(create_test_event()).unwrap();
590        }
591
592        // Truncate
593        wal.truncate().unwrap();
594
595        // Verify sequence is reset
596        assert_eq!(wal.current_sequence(), 0);
597
598        // Verify recovery returns empty
599        let recovered = wal.recover().unwrap();
600        assert_eq!(recovered.len(), 0);
601    }
602}