1use crate::kernel::event::SequencedEvent;
9use crate::kernel::identity::{RunId, Seq};
10use crate::kernel::reducer::Reducer;
11use crate::kernel::snapshot::{Snapshot, SnapshotStore};
12use crate::kernel::state::KernelState;
13use crate::kernel::EventStore;
14use crate::kernel::KernelError;
15
16pub struct ReplayCursor<S: KernelState> {
22 pub events: Box<dyn EventStore>,
24 pub snaps: Option<Box<dyn SnapshotStore<S>>>,
26 pub reducer: Box<dyn Reducer<S>>,
28}
29
30impl<S: KernelState> ReplayCursor<S> {
31 pub fn replay(&self, run_id: &RunId, initial_state: S) -> Result<S, KernelError> {
36 self.replay_from(run_id, initial_state, None)
37 }
38
39 pub fn replay_from(
42 &self,
43 run_id: &RunId,
44 initial_state: S,
45 snapshot: Option<&Snapshot<S>>,
46 ) -> Result<S, KernelError> {
47 const FROM_SEQ: Seq = 1;
48 let (mut state, from_seq) = match snapshot {
49 Some(snap) => (snap.state.clone(), snap.at_seq + 1),
50 None => (initial_state, FROM_SEQ),
51 };
52 let sequenced = self.events.scan(run_id, from_seq)?;
53 for se in sequenced {
54 self.reducer.apply(&mut state, &se)?;
55 }
56 Ok(state)
57 }
58
59 pub fn replay_from_checkpoint(
62 &self,
63 run_id: &RunId,
64 initial_state: S,
65 ) -> Result<S, KernelError> {
66 let from_snap = self
67 .snaps
68 .as_ref()
69 .and_then(|s| s.load_latest(run_id).ok().flatten());
70 self.replay_from(run_id, initial_state, from_snap.as_ref())
71 }
72
73 pub fn replay_step<'a>(
76 &'a self,
77 run_id: &RunId,
78 initial_state: S,
79 from_seq: Seq,
80 ) -> Result<ReplayStepIter<'a, S>, KernelError> {
81 let sequenced = self.events.scan(run_id, from_seq)?;
82 Ok(ReplayStepIter {
83 state: initial_state,
84 events: sequenced,
85 reducer: self.reducer.as_ref(),
86 index: 0,
87 })
88 }
89}
90
91pub struct ReplayStepIter<'a, S: KernelState> {
93 state: S,
94 events: Vec<SequencedEvent>,
95 reducer: &'a dyn Reducer<S>,
96 index: usize,
97}
98
99impl<'a, S: KernelState> ReplayStepIter<'a, S> {
100 pub fn next_step(&mut self) -> Result<Option<(S, SequencedEvent)>, KernelError> {
102 let se = match self.events.get(self.index) {
103 Some(se) => se.clone(),
104 None => return Ok(None),
105 };
106 self.reducer.apply(&mut self.state, &se)?;
107 self.index += 1;
108 Ok(Some((self.state.clone(), se)))
109 }
110
111 pub fn current_state(&self) -> &S {
113 &self.state
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120 use crate::kernel::event::Event;
121 use crate::kernel::event_store::InMemoryEventStore;
122 use crate::kernel::StateUpdatedOnlyReducer;
123 use serde::{Deserialize, Serialize};
124
125 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
126 struct TestState(u32);
127
128 impl crate::kernel::state::KernelState for TestState {
129 fn version(&self) -> u32 {
130 1
131 }
132 }
133
134 #[test]
135 fn replay_from_scratch_reconstructs_state() {
136 let events = InMemoryEventStore::new();
137 let run_id: RunId = "r1".into();
138 events
139 .append(
140 &run_id,
141 &[
142 Event::StateUpdated {
143 step_id: Some("a".into()),
144 payload: serde_json::to_value(&TestState(1)).unwrap(),
145 },
146 Event::StateUpdated {
147 step_id: Some("b".into()),
148 payload: serde_json::to_value(&TestState(2)).unwrap(),
149 },
150 Event::Completed,
151 ],
152 )
153 .unwrap();
154 let cursor = ReplayCursor::<TestState> {
155 events: Box::new(events),
156 snaps: None,
157 reducer: Box::new(StateUpdatedOnlyReducer),
158 };
159 let state = cursor.replay(&run_id, TestState(0)).unwrap();
160 assert_eq!(state, TestState(2));
161 }
162
163 #[test]
164 fn replay_from_checkpoint_applies_only_tail() {
165 let events = InMemoryEventStore::new();
166 let run_id: RunId = "r2".into();
167 events
168 .append(
169 &run_id,
170 &[
171 Event::StateUpdated {
172 step_id: Some("a".into()),
173 payload: serde_json::to_value(&TestState(10)).unwrap(),
174 },
175 Event::StateUpdated {
176 step_id: Some("b".into()),
177 payload: serde_json::to_value(&TestState(20)).unwrap(),
178 },
179 Event::Completed,
180 ],
181 )
182 .unwrap();
183 let snaps = crate::kernel::snapshot::InMemorySnapshotStore::new();
184 snaps
185 .save(&Snapshot {
186 run_id: run_id.clone(),
187 at_seq: 1,
188 state: TestState(10),
189 })
190 .unwrap();
191 let cursor = ReplayCursor::<TestState> {
192 events: Box::new(events),
193 snaps: Some(Box::new(snaps)),
194 reducer: Box::new(StateUpdatedOnlyReducer),
195 };
196 let state = cursor
197 .replay_from_checkpoint(&run_id, TestState(0))
198 .unwrap();
199 assert_eq!(state, TestState(20));
200 }
201
202 #[test]
203 fn replay_step_yields_state_after_each_event() {
204 let events = InMemoryEventStore::new();
205 let run_id: RunId = "r3".into();
206 events
207 .append(
208 &run_id,
209 &[
210 Event::StateUpdated {
211 step_id: Some("a".into()),
212 payload: serde_json::to_value(&TestState(1)).unwrap(),
213 },
214 Event::StateUpdated {
215 step_id: Some("b".into()),
216 payload: serde_json::to_value(&TestState(2)).unwrap(),
217 },
218 ],
219 )
220 .unwrap();
221 let cursor = ReplayCursor::<TestState> {
222 events: Box::new(events),
223 snaps: None,
224 reducer: Box::new(StateUpdatedOnlyReducer),
225 };
226 let mut iter = cursor.replay_step(&run_id, TestState(0), 1).unwrap();
227 let (s1, _) = iter.next_step().unwrap().unwrap();
228 assert_eq!(s1, TestState(1));
229 let (s2, _) = iter.next_step().unwrap().unwrap();
230 assert_eq!(s2, TestState(2));
231 assert!(iter.next_step().unwrap().is_none());
232 }
233}