actionqueue_storage/recovery/
replay.rs1use crate::recovery::reducer::ReplayReducer;
4use crate::wal::reader::{WalReader, WalReaderError};
5
6pub struct ReplayDriver<R: WalReader> {
8 reader: R,
9 reducer: ReplayReducer,
10}
11
12impl<R: WalReader> ReplayDriver<R> {
13 pub fn new(reader: R, reducer: ReplayReducer) -> Self {
15 ReplayDriver { reader, reducer }
16 }
17
18 pub fn run(&mut self) -> Result<(), WalReaderError> {
34 self.run_with_applied_count().map(|_| ())
35 }
36
37 pub fn run_with_applied_count(&mut self) -> Result<u64, WalReaderError> {
41 let mut applied_events = 0u64;
42 loop {
43 match self.reader.read_next() {
44 Ok(Some(event)) => {
45 self.reducer.apply(&event)?;
47 applied_events = applied_events.saturating_add(1);
48 }
49 Ok(None) => {
50 break;
52 }
53 Err(e) => {
54 return Err(e);
56 }
57 }
58 }
59 Ok(applied_events)
60 }
61
62 pub fn into_reducer(self) -> ReplayReducer {
64 self.reducer
65 }
66}
67
68#[cfg(test)]
69mod tests {
70 use std::collections::VecDeque;
71
72 use crate::recovery::reducer::ReplayReducer;
73 use crate::recovery::replay::ReplayDriver;
74 use crate::wal::event::{WalEvent, WalEventType};
75 use crate::wal::reader::{WalReader, WalReaderError};
76
77 #[derive(Debug)]
79 struct TestWalReader {
80 events: VecDeque<WalEvent>,
81 terminal_error: Option<WalReaderError>,
82 is_end: bool,
83 }
84
85 impl TestWalReader {
86 fn new(events: Vec<WalEvent>, terminal_error: Option<WalReaderError>) -> Self {
87 Self { events: VecDeque::from(events), terminal_error, is_end: false }
88 }
89 }
90
91 impl WalReader for TestWalReader {
92 fn read_next(&mut self) -> Result<Option<WalEvent>, WalReaderError> {
93 if let Some(event) = self.events.pop_front() {
94 return Ok(Some(event));
95 }
96
97 if let Some(error) = self.terminal_error.take() {
98 return Err(error);
99 }
100
101 self.is_end = true;
102 Ok(None)
103 }
104
105 fn seek_to_sequence(&mut self, _sequence: u64) -> Result<(), WalReaderError> {
106 Ok(())
107 }
108
109 fn is_end(&self) -> bool {
110 self.is_end
111 }
112 }
113
114 #[test]
115 fn run_returns_exact_applied_event_count_for_successful_replay() {
116 let reader = TestWalReader::new(
117 vec![
118 WalEvent::new(1, WalEventType::EnginePaused { timestamp: 10 }),
119 WalEvent::new(2, WalEventType::EngineResumed { timestamp: 11 }),
120 ],
121 None,
122 );
123 let mut driver = ReplayDriver::new(reader, ReplayReducer::new());
124
125 let applied_events =
126 driver.run_with_applied_count().expect("replay should succeed with count");
127
128 assert_eq!(applied_events, 2);
129 }
130
131 #[test]
132 fn run_preserves_typed_error_semantics_when_reader_errors() {
133 let reader = TestWalReader::new(
134 vec![WalEvent::new(1, WalEventType::EnginePaused { timestamp: 10 })],
135 Some(WalReaderError::IoError("boom".to_string())),
136 );
137 let mut driver = ReplayDriver::new(reader, ReplayReducer::new());
138
139 let error =
140 driver.run_with_applied_count().expect_err("replay should fail with reader io error");
141
142 assert!(matches!(error, WalReaderError::IoError(message) if message == "boom"));
143 }
144}