Skip to main content

laminar_storage/incremental/
recovery.rs

1//! Recovery manager for checkpoint + WAL replay.
2//!
3//! This module provides recovery functionality that combines:
4//! 1. Loading state from the latest checkpoint
5//! 2. Replaying WAL entries after the checkpoint
6//!
7//! ## Core Invariant
8//!
9//! ```text
10//! Checkpoint(epoch) + WAL.replay(epoch..current) = Consistent State
11//! ```
12//!
13//! ## Recovery Process
14//!
15//! 1. Find the latest valid checkpoint
16//! 2. Load checkpoint state (mmap state snapshot)
17//! 3. Replay WAL entries from checkpoint's WAL position
18//! 4. Restore watermark and source offsets
19
20#[allow(clippy::disallowed_types)] // cold path: incremental checkpoint
21use std::collections::HashMap;
22use std::fs;
23use std::path::{Path, PathBuf};
24use std::time::Duration;
25
26use laminar_core::state::StateSnapshot;
27use tracing::{debug, info, warn};
28
29use super::error::IncrementalCheckpointError;
30use super::manager::{
31    CheckpointConfig, IncrementalCheckpointManager, IncrementalCheckpointMetadata,
32};
33use crate::wal::{WalEntry, WalReadResult, WriteAheadLog};
34
35/// Result of WAL replay operation.
36struct WalReplayResult {
37    /// Number of entries replayed.
38    entries_replayed: u64,
39    /// Final WAL position after replay.
40    final_position: u64,
41    /// Source offsets from commit entries.
42    source_offsets: HashMap<String, u64>,
43    /// Watermark from commit entries.
44    watermark: Option<i64>,
45    /// State changes (key, value or None for delete).
46    state_changes: Vec<(Vec<u8>, Option<Vec<u8>>)>,
47}
48
49/// Recovered state from checkpoint + WAL replay.
50#[derive(Debug)]
51pub struct RecoveredState {
52    /// The epoch at which recovery completed.
53    pub epoch: u64,
54    /// State snapshot (if checkpoint had state data).
55    pub state_snapshot: Option<StateSnapshot>,
56    /// WAL position after replay.
57    pub wal_position: u64,
58    /// Number of WAL entries replayed.
59    pub wal_entries_replayed: u64,
60    /// Source offsets for exactly-once semantics.
61    pub source_offsets: HashMap<String, u64>,
62    /// Watermark at recovery point.
63    pub watermark: Option<i64>,
64    /// Checkpoint ID that was used (if any).
65    pub checkpoint_id: Option<u64>,
66    /// State changes from WAL replay (key, value or None for delete).
67    pub state_changes: Vec<(Vec<u8>, Option<Vec<u8>>)>,
68}
69
70impl RecoveredState {
71    /// Creates a new empty recovered state.
72    #[must_use]
73    pub fn empty() -> Self {
74        Self {
75            epoch: 0,
76            state_snapshot: None,
77            wal_position: 0,
78            wal_entries_replayed: 0,
79            source_offsets: HashMap::new(),
80            watermark: None,
81            checkpoint_id: None,
82            state_changes: Vec::new(),
83        }
84    }
85
86    /// Returns true if recovery found any state.
87    #[must_use]
88    pub fn has_state(&self) -> bool {
89        self.state_snapshot.is_some() || !self.state_changes.is_empty()
90    }
91}
92
93/// Configuration for recovery.
94#[derive(Debug, Clone)]
95pub struct RecoveryConfig {
96    /// Path to checkpoint directory.
97    pub checkpoint_dir: PathBuf,
98    /// Path to WAL file.
99    pub wal_path: PathBuf,
100    /// Whether to repair torn writes in WAL.
101    pub repair_wal: bool,
102    /// Whether to collect state changes for incremental application.
103    pub collect_state_changes: bool,
104    /// Maximum WAL entries to replay (0 = unlimited).
105    pub max_wal_entries: u64,
106}
107
108impl RecoveryConfig {
109    /// Creates a new recovery configuration.
110    #[must_use]
111    pub fn new(checkpoint_dir: &Path, wal_path: &Path) -> Self {
112        Self {
113            checkpoint_dir: checkpoint_dir.to_path_buf(),
114            wal_path: wal_path.to_path_buf(),
115            repair_wal: true,
116            collect_state_changes: false,
117            max_wal_entries: 0,
118        }
119    }
120
121    /// Enables WAL repair on recovery.
122    #[must_use]
123    pub fn with_repair_wal(mut self, enabled: bool) -> Self {
124        self.repair_wal = enabled;
125        self
126    }
127
128    /// Enables collection of state changes.
129    #[must_use]
130    pub fn with_collect_state_changes(mut self, enabled: bool) -> Self {
131        self.collect_state_changes = enabled;
132        self
133    }
134
135    /// Sets maximum WAL entries to replay.
136    #[must_use]
137    pub fn with_max_wal_entries(mut self, max: u64) -> Self {
138        self.max_wal_entries = max;
139        self
140    }
141}
142
143/// Recovery manager for restoring state from checkpoints and WAL.
144pub struct RecoveryManager {
145    /// Recovery configuration.
146    config: RecoveryConfig,
147}
148
149impl RecoveryManager {
150    /// Creates a new recovery manager.
151    #[must_use]
152    pub fn new(config: RecoveryConfig) -> Self {
153        Self { config }
154    }
155
156    /// Performs full recovery from checkpoint + WAL.
157    ///
158    /// This is the main recovery entry point that:
159    /// 1. Finds the latest valid checkpoint
160    /// 2. Loads checkpoint state
161    /// 3. Replays WAL entries after the checkpoint
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if recovery fails.
166    pub fn recover(&self) -> Result<RecoveredState, IncrementalCheckpointError> {
167        info!(
168            checkpoint_dir = %self.config.checkpoint_dir.display(),
169            wal_path = %self.config.wal_path.display(),
170            "Starting recovery"
171        );
172
173        let mut result = RecoveredState::empty();
174
175        // Step 1: Find and load latest checkpoint
176        let checkpoint_config = CheckpointConfig::new(&self.config.checkpoint_dir);
177        let manager = IncrementalCheckpointManager::new(checkpoint_config)?;
178
179        if let Some(checkpoint) = manager.find_latest_checkpoint()? {
180            result = Self::load_checkpoint(&manager, &checkpoint);
181            info!(
182                checkpoint_id = checkpoint.id,
183                epoch = checkpoint.epoch,
184                wal_position = checkpoint.wal_position,
185                "Loaded checkpoint"
186            );
187        } else {
188            debug!("No checkpoint found, starting from WAL beginning");
189        }
190
191        // Step 2: Replay WAL
192        if self.config.wal_path.exists() {
193            let wal_result = self.replay_wal(result.wal_position)?;
194            result.wal_entries_replayed = wal_result.entries_replayed;
195            result.wal_position = wal_result.final_position;
196
197            // Update source offsets and watermark from WAL commits
198            for (source, offset) in wal_result.source_offsets {
199                result.source_offsets.insert(source, offset);
200            }
201            if wal_result.watermark.is_some() {
202                result.watermark = wal_result.watermark;
203            }
204
205            // Collect state changes if configured
206            if self.config.collect_state_changes {
207                result.state_changes = wal_result.state_changes;
208            }
209
210            info!(
211                entries_replayed = wal_result.entries_replayed,
212                final_position = wal_result.final_position,
213                "WAL replay complete"
214            );
215        }
216
217        Ok(result)
218    }
219
220    /// Loads state from a checkpoint.
221    fn load_checkpoint(
222        manager: &IncrementalCheckpointManager,
223        checkpoint: &IncrementalCheckpointMetadata,
224    ) -> RecoveredState {
225        let mut result = RecoveredState::empty();
226        result.checkpoint_id = Some(checkpoint.id);
227        result.epoch = checkpoint.epoch;
228        result.wal_position = checkpoint.wal_position;
229        result.source_offsets.clone_from(&checkpoint.source_offsets);
230        result.watermark = checkpoint.watermark;
231
232        // Try to load state snapshot
233        if let Ok(state_data) = manager.load_checkpoint_state(checkpoint.id) {
234            match StateSnapshot::from_bytes(&state_data) {
235                Ok(snapshot) => {
236                    result.state_snapshot = Some(snapshot);
237                }
238                Err(e) => {
239                    warn!(
240                        checkpoint_id = checkpoint.id,
241                        error = %e,
242                        "Failed to deserialize state snapshot"
243                    );
244                }
245            }
246        }
247
248        result
249    }
250
251    /// Replays WAL from the given position.
252    fn replay_wal(
253        &self,
254        start_position: u64,
255    ) -> Result<WalReplayResult, IncrementalCheckpointError> {
256        let mut wal = WriteAheadLog::new(&self.config.wal_path, Duration::from_millis(100))
257            .map_err(|e| IncrementalCheckpointError::Wal(e.to_string()))?;
258
259        // Repair WAL if configured
260        if self.config.repair_wal {
261            if let Err(e) = wal.repair() {
262                warn!(error = %e, "WAL repair failed, continuing anyway");
263            }
264        }
265
266        let mut reader = wal
267            .read_from(start_position)
268            .map_err(|e| IncrementalCheckpointError::Wal(e.to_string()))?;
269
270        let mut result = WalReplayResult {
271            entries_replayed: 0,
272            final_position: start_position,
273            source_offsets: HashMap::new(),
274            watermark: None,
275            state_changes: Vec::new(),
276        };
277
278        let max_entries = if self.config.max_wal_entries > 0 {
279            self.config.max_wal_entries
280        } else {
281            u64::MAX
282        };
283
284        loop {
285            if result.entries_replayed >= max_entries {
286                debug!(max_entries, "Reached max WAL entries limit");
287                break;
288            }
289
290            match reader.read_next() {
291                Ok(WalReadResult::Entry(entry)) => {
292                    result.final_position = reader.position();
293                    result.entries_replayed += 1;
294
295                    match entry {
296                        WalEntry::Put { key, value } => {
297                            if self.config.collect_state_changes {
298                                result.state_changes.push((key, Some(value)));
299                            }
300                        }
301                        WalEntry::Delete { key } => {
302                            if self.config.collect_state_changes {
303                                result.state_changes.push((key, None));
304                            }
305                        }
306                        WalEntry::Commit { offsets, watermark } => {
307                            for (source, offset) in offsets {
308                                result.source_offsets.insert(source, offset);
309                            }
310                            if watermark.is_some() {
311                                result.watermark = watermark;
312                            }
313                        }
314                        WalEntry::Checkpoint { id } => {
315                            debug!(checkpoint_id = id, "Skipping checkpoint marker in WAL");
316                        }
317                    }
318                }
319                Ok(WalReadResult::Eof) => {
320                    debug!("Reached end of WAL");
321                    break;
322                }
323                Ok(WalReadResult::TornWrite { position, reason }) => {
324                    warn!(position, reason, "Torn write detected, stopping replay");
325                    break;
326                }
327                Ok(WalReadResult::ChecksumMismatch { position, .. }) => {
328                    warn!(position, "CRC mismatch detected, stopping replay");
329                    break;
330                }
331                Ok(WalReadResult::Corrupted { position, reason }) => {
332                    warn!(
333                        position,
334                        reason, "[LDB-6006] Corrupted WAL entry detected, stopping replay"
335                    );
336                    break;
337                }
338                Err(e) => {
339                    return Err(IncrementalCheckpointError::Wal(format!(
340                        "WAL read error: {e}"
341                    )));
342                }
343            }
344        }
345
346        Ok(result)
347    }
348
349    /// Convenience method for simple recovery.
350    ///
351    /// # Errors
352    ///
353    /// Returns an error if recovery fails.
354    pub fn recover_simple(
355        checkpoint_dir: &Path,
356        wal_path: &Path,
357    ) -> Result<RecoveredState, IncrementalCheckpointError> {
358        let config = RecoveryConfig::new(checkpoint_dir, wal_path);
359        let manager = RecoveryManager::new(config);
360        manager.recover()
361    }
362
363    /// Recovers and returns state changes for application to state store.
364    ///
365    /// # Errors
366    ///
367    /// Returns an error if recovery fails.
368    pub fn recover_with_changes(
369        checkpoint_dir: &Path,
370        wal_path: &Path,
371    ) -> Result<RecoveredState, IncrementalCheckpointError> {
372        let config = RecoveryConfig::new(checkpoint_dir, wal_path).with_collect_state_changes(true);
373        let manager = RecoveryManager::new(config);
374        manager.recover()
375    }
376}
377
378/// Validates a checkpoint directory.
379///
380/// # Errors
381///
382/// Returns an error if the checkpoint is invalid.
383pub fn validate_checkpoint(
384    checkpoint_dir: &Path,
385) -> Result<IncrementalCheckpointMetadata, IncrementalCheckpointError> {
386    let metadata_path = checkpoint_dir.join("metadata.json");
387
388    if !metadata_path.exists() {
389        return Err(IncrementalCheckpointError::NotFound(
390            "metadata.json not found".to_string(),
391        ));
392    }
393
394    let metadata_json = fs::read_to_string(&metadata_path)?;
395    let metadata = IncrementalCheckpointMetadata::from_json(&metadata_json)?;
396
397    // Validate state file if expected
398    let state_path = checkpoint_dir.join("state.bin");
399    if state_path.exists() {
400        let state_data = fs::read(&state_path)?;
401        StateSnapshot::from_bytes(&state_data)
402            .map_err(|e| IncrementalCheckpointError::Corruption(e.to_string()))?;
403    }
404
405    Ok(metadata)
406}
407
408/// Calculates the WAL size for checkpoint decisions.
409///
410/// # Errors
411///
412/// Returns an error if the file cannot be read.
413pub fn wal_size(wal_path: &Path) -> Result<u64, IncrementalCheckpointError> {
414    if !wal_path.exists() {
415        return Ok(0);
416    }
417
418    let metadata = fs::metadata(wal_path)?;
419    Ok(metadata.len())
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425    use std::time::Duration;
426    use tempfile::TempDir;
427
428    #[test]
429    fn test_recovered_state_empty() {
430        let state = RecoveredState::empty();
431        assert_eq!(state.epoch, 0);
432        assert!(state.state_snapshot.is_none());
433        assert!(state.source_offsets.is_empty());
434        assert!(!state.has_state());
435    }
436
437    #[test]
438    fn test_recovery_config() {
439        let config = RecoveryConfig::new(Path::new("/checkpoints"), Path::new("/wal.log"))
440            .with_repair_wal(true)
441            .with_collect_state_changes(true)
442            .with_max_wal_entries(1000);
443
444        assert!(config.repair_wal);
445        assert!(config.collect_state_changes);
446        assert_eq!(config.max_wal_entries, 1000);
447    }
448
449    #[test]
450    fn test_recovery_no_checkpoint_no_wal() {
451        let temp_dir = TempDir::new().unwrap();
452        let checkpoint_dir = temp_dir.path().join("checkpoints");
453        let wal_path = temp_dir.path().join("wal.log");
454
455        fs::create_dir_all(&checkpoint_dir).unwrap();
456
457        let config = RecoveryConfig::new(&checkpoint_dir, &wal_path);
458        let manager = RecoveryManager::new(config);
459
460        let result = manager.recover().unwrap();
461        assert_eq!(result.epoch, 0);
462        assert!(result.state_snapshot.is_none());
463        assert!(result.checkpoint_id.is_none());
464        assert_eq!(result.wal_entries_replayed, 0);
465    }
466
467    #[test]
468    fn test_recovery_with_checkpoint_only() {
469        let temp_dir = TempDir::new().unwrap();
470        let checkpoint_dir = temp_dir.path().join("checkpoints");
471        let wal_path = temp_dir.path().join("wal.log");
472
473        // Create a checkpoint
474        let config = CheckpointConfig::new(&checkpoint_dir);
475        let mut ckpt_manager = IncrementalCheckpointManager::new(config).unwrap();
476        ckpt_manager.set_epoch(42);
477
478        let mut offsets = HashMap::new();
479        offsets.insert("source1".to_string(), 100);
480
481        let state_data = StateSnapshot::new(vec![
482            (b"key1".to_vec(), b"value1".to_vec()),
483            (b"key2".to_vec(), b"value2".to_vec()),
484        ])
485        .to_bytes()
486        .unwrap();
487
488        let metadata = ckpt_manager
489            .create_checkpoint_with_state(500, offsets, Some(5000), &state_data)
490            .unwrap();
491
492        // Recover
493        let recovery_config = RecoveryConfig::new(&checkpoint_dir, &wal_path);
494        let recovery_manager = RecoveryManager::new(recovery_config);
495
496        let result = recovery_manager.recover().unwrap();
497        assert_eq!(result.epoch, 42);
498        assert_eq!(result.checkpoint_id, Some(metadata.id));
499        assert_eq!(result.wal_position, 500);
500        assert_eq!(result.watermark, Some(5000));
501        assert_eq!(result.source_offsets.get("source1"), Some(&100));
502        assert!(result.state_snapshot.is_some());
503
504        let snapshot = result.state_snapshot.unwrap();
505        assert_eq!(snapshot.len(), 2);
506    }
507
508    #[test]
509    fn test_recovery_with_wal_only() {
510        let temp_dir = TempDir::new().unwrap();
511        let checkpoint_dir = temp_dir.path().join("checkpoints");
512        let wal_path = temp_dir.path().join("wal.log");
513
514        fs::create_dir_all(&checkpoint_dir).unwrap();
515
516        // Create WAL entries
517        {
518            let mut wal = WriteAheadLog::new(&wal_path, Duration::from_millis(10)).unwrap();
519            wal.set_sync_on_write(true);
520
521            wal.append(&WalEntry::Put {
522                key: b"key1".to_vec(),
523                value: b"value1".to_vec(),
524            })
525            .unwrap();
526
527            wal.append(&WalEntry::Put {
528                key: b"key2".to_vec(),
529                value: b"value2".to_vec(),
530            })
531            .unwrap();
532
533            let mut offsets = HashMap::new();
534            offsets.insert("source1".to_string(), 50);
535            wal.append(&WalEntry::Commit {
536                offsets,
537                watermark: Some(1000),
538            })
539            .unwrap();
540
541            wal.sync().unwrap();
542        }
543
544        // Recover with state changes collection
545        let config =
546            RecoveryConfig::new(&checkpoint_dir, &wal_path).with_collect_state_changes(true);
547        let manager = RecoveryManager::new(config);
548
549        let result = manager.recover().unwrap();
550        assert!(result.checkpoint_id.is_none());
551        assert_eq!(result.wal_entries_replayed, 3);
552        assert_eq!(result.watermark, Some(1000));
553        assert_eq!(result.source_offsets.get("source1"), Some(&50));
554        assert_eq!(result.state_changes.len(), 2);
555    }
556
557    #[test]
558    fn test_recovery_checkpoint_plus_wal() {
559        let temp_dir = TempDir::new().unwrap();
560        let checkpoint_dir = temp_dir.path().join("checkpoints");
561        let wal_path = temp_dir.path().join("wal.log");
562
563        // Create checkpoint
564        let config = CheckpointConfig::new(&checkpoint_dir);
565        let mut ckpt_manager = IncrementalCheckpointManager::new(config).unwrap();
566        ckpt_manager.set_epoch(10);
567
568        let state_data = StateSnapshot::new(vec![(b"key1".to_vec(), b"value1".to_vec())])
569            .to_bytes()
570            .unwrap();
571
572        ckpt_manager
573            .create_checkpoint_with_state(0, HashMap::new(), Some(1000), &state_data)
574            .unwrap();
575
576        // Create WAL entries after checkpoint
577        {
578            let mut wal = WriteAheadLog::new(&wal_path, Duration::from_millis(10)).unwrap();
579            wal.set_sync_on_write(true);
580
581            wal.append(&WalEntry::Put {
582                key: b"key2".to_vec(),
583                value: b"value2".to_vec(),
584            })
585            .unwrap();
586
587            wal.append(&WalEntry::Delete {
588                key: b"key1".to_vec(),
589            })
590            .unwrap();
591
592            let mut offsets = HashMap::new();
593            offsets.insert("source1".to_string(), 100);
594            wal.append(&WalEntry::Commit {
595                offsets,
596                watermark: Some(2000),
597            })
598            .unwrap();
599
600            wal.sync().unwrap();
601        }
602
603        // Recover
604        let recovery_config =
605            RecoveryConfig::new(&checkpoint_dir, &wal_path).with_collect_state_changes(true);
606        let recovery_manager = RecoveryManager::new(recovery_config);
607
608        let result = recovery_manager.recover().unwrap();
609        assert!(result.checkpoint_id.is_some());
610        assert_eq!(result.wal_entries_replayed, 3);
611        assert_eq!(result.watermark, Some(2000)); // Updated by WAL
612        assert_eq!(result.source_offsets.get("source1"), Some(&100));
613        assert!(result.state_snapshot.is_some());
614        assert_eq!(result.state_changes.len(), 2);
615    }
616
617    #[test]
618    fn test_validate_checkpoint() {
619        let temp_dir = TempDir::new().unwrap();
620        let checkpoint_dir = temp_dir.path().join("checkpoints");
621
622        // Create checkpoint
623        let config = CheckpointConfig::new(&checkpoint_dir);
624        let mut manager = IncrementalCheckpointManager::new(config).unwrap();
625        manager.set_epoch(5);
626
627        let state_data = StateSnapshot::new(vec![(b"key".to_vec(), b"value".to_vec())])
628            .to_bytes()
629            .unwrap();
630
631        let metadata = manager
632            .create_checkpoint_with_state(100, HashMap::new(), None, &state_data)
633            .unwrap();
634
635        // Validate
636        let checkpoint_path = metadata.checkpoint_path(&checkpoint_dir);
637        let validated = validate_checkpoint(&checkpoint_path).unwrap();
638        assert_eq!(validated.id, metadata.id);
639        assert_eq!(validated.epoch, 5);
640    }
641
642    #[test]
643    fn test_wal_size() {
644        let temp_dir = TempDir::new().unwrap();
645        let wal_path = temp_dir.path().join("wal.log");
646
647        // Non-existent WAL
648        assert_eq!(wal_size(&wal_path).unwrap(), 0);
649
650        // Create WAL with data
651        {
652            let mut wal = WriteAheadLog::new(&wal_path, Duration::from_millis(10)).unwrap();
653            wal.append(&WalEntry::Put {
654                key: b"key".to_vec(),
655                value: b"value".to_vec(),
656            })
657            .unwrap();
658            wal.sync().unwrap();
659        }
660
661        let size = wal_size(&wal_path).unwrap();
662        assert!(size > 0);
663    }
664
665    #[test]
666    fn test_recovery_max_entries() {
667        let temp_dir = TempDir::new().unwrap();
668        let checkpoint_dir = temp_dir.path().join("checkpoints");
669        let wal_path = temp_dir.path().join("wal.log");
670
671        fs::create_dir_all(&checkpoint_dir).unwrap();
672
673        // Create many WAL entries
674        {
675            let mut wal = WriteAheadLog::new(&wal_path, Duration::from_millis(10)).unwrap();
676            wal.set_sync_on_write(true);
677
678            for i in 0..100 {
679                wal.append(&WalEntry::Put {
680                    key: format!("key{i}").into_bytes(),
681                    value: format!("value{i}").into_bytes(),
682                })
683                .unwrap();
684            }
685            wal.sync().unwrap();
686        }
687
688        // Recover with limit
689        let config = RecoveryConfig::new(&checkpoint_dir, &wal_path).with_max_wal_entries(10);
690        let manager = RecoveryManager::new(config);
691
692        let result = manager.recover().unwrap();
693        assert_eq!(result.wal_entries_replayed, 10);
694    }
695}