use crate::recovery::reducer::ReplayReducer;
use crate::wal::reader::{WalReader, WalReaderError};
pub struct ReplayDriver<R: WalReader> {
reader: R,
reducer: ReplayReducer,
}
impl<R: WalReader> ReplayDriver<R> {
pub fn new(reader: R, reducer: ReplayReducer) -> Self {
ReplayDriver { reader, reducer }
}
pub fn run(&mut self) -> Result<(), WalReaderError> {
self.run_with_applied_count().map(|_| ())
}
pub fn run_with_applied_count(&mut self) -> Result<u64, WalReaderError> {
let mut applied_events = 0u64;
loop {
match self.reader.read_next() {
Ok(Some(event)) => {
self.reducer.apply(&event)?;
applied_events = applied_events.saturating_add(1);
}
Ok(None) => {
break;
}
Err(e) => {
return Err(e);
}
}
}
Ok(applied_events)
}
pub fn into_reducer(self) -> ReplayReducer {
self.reducer
}
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use crate::recovery::reducer::ReplayReducer;
use crate::recovery::replay::ReplayDriver;
use crate::wal::event::{WalEvent, WalEventType};
use crate::wal::reader::{WalReader, WalReaderError};
#[derive(Debug)]
struct TestWalReader {
events: VecDeque<WalEvent>,
terminal_error: Option<WalReaderError>,
is_end: bool,
}
impl TestWalReader {
fn new(events: Vec<WalEvent>, terminal_error: Option<WalReaderError>) -> Self {
Self { events: VecDeque::from(events), terminal_error, is_end: false }
}
}
impl WalReader for TestWalReader {
fn read_next(&mut self) -> Result<Option<WalEvent>, WalReaderError> {
if let Some(event) = self.events.pop_front() {
return Ok(Some(event));
}
if let Some(error) = self.terminal_error.take() {
return Err(error);
}
self.is_end = true;
Ok(None)
}
fn seek_to_sequence(&mut self, _sequence: u64) -> Result<(), WalReaderError> {
Ok(())
}
fn is_end(&self) -> bool {
self.is_end
}
}
#[test]
fn run_returns_exact_applied_event_count_for_successful_replay() {
let reader = TestWalReader::new(
vec![
WalEvent::new(1, WalEventType::EnginePaused { timestamp: 10 }),
WalEvent::new(2, WalEventType::EngineResumed { timestamp: 11 }),
],
None,
);
let mut driver = ReplayDriver::new(reader, ReplayReducer::new());
let applied_events =
driver.run_with_applied_count().expect("replay should succeed with count");
assert_eq!(applied_events, 2);
}
#[test]
fn run_preserves_typed_error_semantics_when_reader_errors() {
let reader = TestWalReader::new(
vec![WalEvent::new(1, WalEventType::EnginePaused { timestamp: 10 })],
Some(WalReaderError::IoError("boom".to_string())),
);
let mut driver = ReplayDriver::new(reader, ReplayReducer::new());
let error =
driver.run_with_applied_count().expect_err("replay should fail with reader io error");
assert!(matches!(error, WalReaderError::IoError(message) if message == "boom"));
}
}