Skip to main content

actionqueue_storage/recovery/
replay.rs

1//! Recovery replay driver - orchestrates replay from WAL.
2
3use crate::recovery::reducer::ReplayReducer;
4use crate::wal::reader::{WalReader, WalReaderError};
5
6/// A replay driver that loads events from a reader and feeds them to a reducer.
7pub struct ReplayDriver<R: WalReader> {
8    reader: R,
9    reducer: ReplayReducer,
10}
11
12impl<R: WalReader> ReplayDriver<R> {
13    /// Creates a new replay driver with the given reader and reducer.
14    pub fn new(reader: R, reducer: ReplayReducer) -> Self {
15        ReplayDriver { reader, reducer }
16    }
17
18    /// Runs the replay, feeding events from the reader to the reducer.
19    ///
20    /// Reads events sequentially from the reader and applies them to the reducer.
21    /// If strict WAL corruption is encountered, returns `WalReaderError::Corruption`.
22    /// If all events are processed successfully, returns `Ok(())`.
23    ///
24    /// # Errors
25    ///
26    /// Returns `WalReaderError::Corruption` if strict corruption is detected
27    /// in the event stream.
28    ///
29    /// Returns `WalReaderError::IoError` if an I/O error occurs during reading.
30    ///
31    /// Returns `WalReaderError::ReducerError` if the reducer encounters an error applying an event
32    /// (e.g., invalid transition, duplicate event).
33    pub fn run(&mut self) -> Result<(), WalReaderError> {
34        self.run_with_applied_count().map(|_| ())
35    }
36
37    /// Runs replay and returns the authoritative count of applied replay events.
38    ///
39    /// This is the counting seam used by recovery bootstrap metrics population.
40    pub fn run_with_applied_count(&mut self) -> Result<u64, WalReaderError> {
41        let mut applied_events = 0u64;
42        loop {
43            match self.reader.read_next() {
44                Ok(Some(event)) => {
45                    // Apply the event to the reducer
46                    self.reducer.apply(&event)?;
47                    applied_events = applied_events.saturating_add(1);
48                }
49                Ok(None) => {
50                    // End of WAL reached successfully.
51                    break;
52                }
53                Err(e) => {
54                    // Propagate strict corruption or other reader/reducer errors.
55                    return Err(e);
56                }
57            }
58        }
59        Ok(applied_events)
60    }
61
62    /// Returns the reducer after replay has completed.
63    pub fn into_reducer(self) -> ReplayReducer {
64        self.reducer
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use std::collections::VecDeque;
71
72    use crate::recovery::reducer::ReplayReducer;
73    use crate::recovery::replay::ReplayDriver;
74    use crate::wal::event::{WalEvent, WalEventType};
75    use crate::wal::reader::{WalReader, WalReaderError};
76
77    /// Test WAL reader with deterministic event and error sequencing.
78    #[derive(Debug)]
79    struct TestWalReader {
80        events: VecDeque<WalEvent>,
81        terminal_error: Option<WalReaderError>,
82        is_end: bool,
83    }
84
85    impl TestWalReader {
86        fn new(events: Vec<WalEvent>, terminal_error: Option<WalReaderError>) -> Self {
87            Self { events: VecDeque::from(events), terminal_error, is_end: false }
88        }
89    }
90
91    impl WalReader for TestWalReader {
92        fn read_next(&mut self) -> Result<Option<WalEvent>, WalReaderError> {
93            if let Some(event) = self.events.pop_front() {
94                return Ok(Some(event));
95            }
96
97            if let Some(error) = self.terminal_error.take() {
98                return Err(error);
99            }
100
101            self.is_end = true;
102            Ok(None)
103        }
104
105        fn seek_to_sequence(&mut self, _sequence: u64) -> Result<(), WalReaderError> {
106            Ok(())
107        }
108
109        fn is_end(&self) -> bool {
110            self.is_end
111        }
112    }
113
114    #[test]
115    fn run_returns_exact_applied_event_count_for_successful_replay() {
116        let reader = TestWalReader::new(
117            vec![
118                WalEvent::new(1, WalEventType::EnginePaused { timestamp: 10 }),
119                WalEvent::new(2, WalEventType::EngineResumed { timestamp: 11 }),
120            ],
121            None,
122        );
123        let mut driver = ReplayDriver::new(reader, ReplayReducer::new());
124
125        let applied_events =
126            driver.run_with_applied_count().expect("replay should succeed with count");
127
128        assert_eq!(applied_events, 2);
129    }
130
131    #[test]
132    fn run_preserves_typed_error_semantics_when_reader_errors() {
133        let reader = TestWalReader::new(
134            vec![WalEvent::new(1, WalEventType::EnginePaused { timestamp: 10 })],
135            Some(WalReaderError::IoError("boom".to_string())),
136        );
137        let mut driver = ReplayDriver::new(reader, ReplayReducer::new());
138
139        let error =
140            driver.run_with_applied_count().expect_err("replay should fail with reader io error");
141
142        assert!(matches!(error, WalReaderError::IoError(message) if message == "boom"));
143    }
144}