use crate::kernel::event::SequencedEvent;
use crate::kernel::identity::{RunId, Seq};
use crate::kernel::reducer::Reducer;
use crate::kernel::snapshot::{Snapshot, SnapshotStore};
use crate::kernel::state::KernelState;
use crate::kernel::EventStore;
use crate::kernel::KernelError;
pub struct ReplayCursor<S: KernelState> {
pub events: Box<dyn EventStore>,
pub snaps: Option<Box<dyn SnapshotStore<S>>>,
pub reducer: Box<dyn Reducer<S>>,
}
impl<S: KernelState> ReplayCursor<S> {
pub fn replay(&self, run_id: &RunId, initial_state: S) -> Result<S, KernelError> {
self.replay_from(run_id, initial_state, None)
}
pub fn replay_from(
&self,
run_id: &RunId,
initial_state: S,
snapshot: Option<&Snapshot<S>>,
) -> Result<S, KernelError> {
const FROM_SEQ: Seq = 1;
let (mut state, from_seq) = match snapshot {
Some(snap) => (snap.state.clone(), snap.at_seq + 1),
None => (initial_state, FROM_SEQ),
};
let sequenced = self.events.scan(run_id, from_seq)?;
for se in sequenced {
self.reducer.apply(&mut state, &se)?;
}
Ok(state)
}
pub fn replay_from_checkpoint(
&self,
run_id: &RunId,
initial_state: S,
) -> Result<S, KernelError> {
let from_snap = self
.snaps
.as_ref()
.and_then(|s| s.load_latest(run_id).ok().flatten());
self.replay_from(run_id, initial_state, from_snap.as_ref())
}
pub fn replay_step<'a>(
&'a self,
run_id: &RunId,
initial_state: S,
from_seq: Seq,
) -> Result<ReplayStepIter<'a, S>, KernelError> {
let sequenced = self.events.scan(run_id, from_seq)?;
Ok(ReplayStepIter {
state: initial_state,
events: sequenced,
reducer: self.reducer.as_ref(),
index: 0,
})
}
}
pub struct ReplayStepIter<'a, S: KernelState> {
state: S,
events: Vec<SequencedEvent>,
reducer: &'a dyn Reducer<S>,
index: usize,
}
impl<'a, S: KernelState> ReplayStepIter<'a, S> {
pub fn next_step(&mut self) -> Result<Option<(S, SequencedEvent)>, KernelError> {
let se = match self.events.get(self.index) {
Some(se) => se.clone(),
None => return Ok(None),
};
self.reducer.apply(&mut self.state, &se)?;
self.index += 1;
Ok(Some((self.state.clone(), se)))
}
pub fn current_state(&self) -> &S {
&self.state
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kernel::event::Event;
use crate::kernel::event_store::InMemoryEventStore;
use crate::kernel::StateUpdatedOnlyReducer;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
struct TestState(u32);
impl crate::kernel::state::KernelState for TestState {
fn version(&self) -> u32 {
1
}
}
#[test]
fn replay_from_scratch_reconstructs_state() {
let events = InMemoryEventStore::new();
let run_id: RunId = "r1".into();
events
.append(
&run_id,
&[
Event::StateUpdated {
step_id: Some("a".into()),
payload: serde_json::to_value(&TestState(1)).unwrap(),
},
Event::StateUpdated {
step_id: Some("b".into()),
payload: serde_json::to_value(&TestState(2)).unwrap(),
},
Event::Completed,
],
)
.unwrap();
let cursor = ReplayCursor::<TestState> {
events: Box::new(events),
snaps: None,
reducer: Box::new(StateUpdatedOnlyReducer),
};
let state = cursor.replay(&run_id, TestState(0)).unwrap();
assert_eq!(state, TestState(2));
}
#[test]
fn replay_from_checkpoint_applies_only_tail() {
let events = InMemoryEventStore::new();
let run_id: RunId = "r2".into();
events
.append(
&run_id,
&[
Event::StateUpdated {
step_id: Some("a".into()),
payload: serde_json::to_value(&TestState(10)).unwrap(),
},
Event::StateUpdated {
step_id: Some("b".into()),
payload: serde_json::to_value(&TestState(20)).unwrap(),
},
Event::Completed,
],
)
.unwrap();
let snaps = crate::kernel::snapshot::InMemorySnapshotStore::new();
snaps
.save(&Snapshot {
run_id: run_id.clone(),
at_seq: 1,
state: TestState(10),
})
.unwrap();
let cursor = ReplayCursor::<TestState> {
events: Box::new(events),
snaps: Some(Box::new(snaps)),
reducer: Box::new(StateUpdatedOnlyReducer),
};
let state = cursor
.replay_from_checkpoint(&run_id, TestState(0))
.unwrap();
assert_eq!(state, TestState(20));
}
#[test]
fn replay_step_yields_state_after_each_event() {
let events = InMemoryEventStore::new();
let run_id: RunId = "r3".into();
events
.append(
&run_id,
&[
Event::StateUpdated {
step_id: Some("a".into()),
payload: serde_json::to_value(&TestState(1)).unwrap(),
},
Event::StateUpdated {
step_id: Some("b".into()),
payload: serde_json::to_value(&TestState(2)).unwrap(),
},
],
)
.unwrap();
let cursor = ReplayCursor::<TestState> {
events: Box::new(events),
snaps: None,
reducer: Box::new(StateUpdatedOnlyReducer),
};
let mut iter = cursor.replay_step(&run_id, TestState(0), 1).unwrap();
let (s1, _) = iter.next_step().unwrap().unwrap();
assert_eq!(s1, TestState(1));
let (s2, _) = iter.next_step().unwrap().unwrap();
assert_eq!(s2, TestState(2));
assert!(iter.next_step().unwrap().is_none());
}
}