Skip to main content

oris_kernel/kernel/
replay_cursor.rs

1//! Replay cursor engine: reconstruct state from the event log without live side effects.
2//!
3//! The replay loop is: **Load checkpoint → Replay events → Inject recorded outputs → Reconstruct state.**
4//! This engine holds only the event store, optional snapshot store, and reducer. It has **no**
5//! executor or step function, so **live tool execution is hard-disabled** during replay.
6//! Recorded action outputs (ActionSucceeded/ActionFailed) are already in the log and applied by the reducer.
7
8use crate::kernel::event::SequencedEvent;
9use crate::kernel::identity::{RunId, Seq};
10use crate::kernel::reducer::Reducer;
11use crate::kernel::snapshot::{Snapshot, SnapshotStore};
12use crate::kernel::state::KernelState;
13use crate::kernel::EventStore;
14use crate::kernel::KernelError;
15
16/// Replay cursor: reconstructs state from the event log. No executor, no live execution.
17///
18/// Use this when you need to replay a run (e.g. verify, debug, or resume). The cursor
19/// loads the latest checkpoint if available, then applies all subsequent events in order.
20/// Action results are taken from the log (ActionSucceeded/ActionFailed), not from live execution.
21pub struct ReplayCursor<S: KernelState> {
22    /// Event store (source of truth).
23    pub events: Box<dyn EventStore>,
24    /// Optional snapshot store for checkpoint-based replay (optimization).
25    pub snaps: Option<Box<dyn SnapshotStore<S>>>,
26    /// Reducer to apply each event to state (deterministic).
27    pub reducer: Box<dyn Reducer<S>>,
28}
29
30impl<S: KernelState> ReplayCursor<S> {
31    /// Replays the run: load checkpoint (if any) → replay events → return final state.
32    ///
33    /// Recorded outputs are injected via the event log (reducer applies ActionSucceeded/ActionFailed).
34    /// No live tool or LLM execution occurs.
35    pub fn replay(&self, run_id: &RunId, initial_state: S) -> Result<S, KernelError> {
36        self.replay_from(run_id, initial_state, None)
37    }
38
39    /// Replays from an optional snapshot. If `snapshot` is provided, state starts at
40    /// `snapshot.state` and only events with seq > snapshot.at_seq are applied.
41    pub fn replay_from(
42        &self,
43        run_id: &RunId,
44        initial_state: S,
45        snapshot: Option<&Snapshot<S>>,
46    ) -> Result<S, KernelError> {
47        const FROM_SEQ: Seq = 1;
48        let (mut state, from_seq) = match snapshot {
49            Some(snap) => (snap.state.clone(), snap.at_seq + 1),
50            None => (initial_state, FROM_SEQ),
51        };
52        let sequenced = self.events.scan(run_id, from_seq)?;
53        for se in sequenced {
54            self.reducer.apply(&mut state, &se)?;
55        }
56        Ok(state)
57    }
58
59    /// Replays using the latest snapshot from the snapshot store (if any).
60    /// Equivalent to load checkpoint → replay events → reconstruct state.
61    pub fn replay_from_checkpoint(
62        &self,
63        run_id: &RunId,
64        initial_state: S,
65    ) -> Result<S, KernelError> {
66        let from_snap = self
67            .snaps
68            .as_ref()
69            .and_then(|s| s.load_latest(run_id).ok().flatten());
70        self.replay_from(run_id, initial_state, from_snap.as_ref())
71    }
72
73    /// Replays event-by-event, yielding state after each applied event (for debugging/verify).
74    /// No live execution; each event is applied via the reducer only.
75    pub fn replay_step<'a>(
76        &'a self,
77        run_id: &RunId,
78        initial_state: S,
79        from_seq: Seq,
80    ) -> Result<ReplayStepIter<'a, S>, KernelError> {
81        let sequenced = self.events.scan(run_id, from_seq)?;
82        Ok(ReplayStepIter {
83            state: initial_state,
84            events: sequenced,
85            reducer: self.reducer.as_ref(),
86            index: 0,
87        })
88    }
89}
90
91/// Iterator that yields (state_after_event, sequenced_event) when replaying step-by-step.
92pub struct ReplayStepIter<'a, S: KernelState> {
93    state: S,
94    events: Vec<SequencedEvent>,
95    reducer: &'a dyn Reducer<S>,
96    index: usize,
97}
98
99impl<'a, S: KernelState> ReplayStepIter<'a, S> {
100    /// Returns the current state and the next event index (1-based from run), or None if done.
101    pub fn next_step(&mut self) -> Result<Option<(S, SequencedEvent)>, KernelError> {
102        let se = match self.events.get(self.index) {
103            Some(se) => se.clone(),
104            None => return Ok(None),
105        };
106        self.reducer.apply(&mut self.state, &se)?;
107        self.index += 1;
108        Ok(Some((self.state.clone(), se)))
109    }
110
111    /// Current state (after all steps applied so far).
112    pub fn current_state(&self) -> &S {
113        &self.state
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use crate::kernel::event::Event;
121    use crate::kernel::event_store::InMemoryEventStore;
122    use crate::kernel::StateUpdatedOnlyReducer;
123    use serde::{Deserialize, Serialize};
124
125    #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
126    struct TestState(u32);
127
128    impl crate::kernel::state::KernelState for TestState {
129        fn version(&self) -> u32 {
130            1
131        }
132    }
133
134    #[test]
135    fn replay_from_scratch_reconstructs_state() {
136        let events = InMemoryEventStore::new();
137        let run_id: RunId = "r1".into();
138        events
139            .append(
140                &run_id,
141                &[
142                    Event::StateUpdated {
143                        step_id: Some("a".into()),
144                        payload: serde_json::to_value(&TestState(1)).unwrap(),
145                    },
146                    Event::StateUpdated {
147                        step_id: Some("b".into()),
148                        payload: serde_json::to_value(&TestState(2)).unwrap(),
149                    },
150                    Event::Completed,
151                ],
152            )
153            .unwrap();
154        let cursor = ReplayCursor::<TestState> {
155            events: Box::new(events),
156            snaps: None,
157            reducer: Box::new(StateUpdatedOnlyReducer),
158        };
159        let state = cursor.replay(&run_id, TestState(0)).unwrap();
160        assert_eq!(state, TestState(2));
161    }
162
163    #[test]
164    fn replay_from_checkpoint_applies_only_tail() {
165        let events = InMemoryEventStore::new();
166        let run_id: RunId = "r2".into();
167        events
168            .append(
169                &run_id,
170                &[
171                    Event::StateUpdated {
172                        step_id: Some("a".into()),
173                        payload: serde_json::to_value(&TestState(10)).unwrap(),
174                    },
175                    Event::StateUpdated {
176                        step_id: Some("b".into()),
177                        payload: serde_json::to_value(&TestState(20)).unwrap(),
178                    },
179                    Event::Completed,
180                ],
181            )
182            .unwrap();
183        let snaps = crate::kernel::snapshot::InMemorySnapshotStore::new();
184        snaps
185            .save(&Snapshot {
186                run_id: run_id.clone(),
187                at_seq: 1,
188                state: TestState(10),
189            })
190            .unwrap();
191        let cursor = ReplayCursor::<TestState> {
192            events: Box::new(events),
193            snaps: Some(Box::new(snaps)),
194            reducer: Box::new(StateUpdatedOnlyReducer),
195        };
196        let state = cursor
197            .replay_from_checkpoint(&run_id, TestState(0))
198            .unwrap();
199        assert_eq!(state, TestState(20));
200    }
201
202    #[test]
203    fn replay_step_yields_state_after_each_event() {
204        let events = InMemoryEventStore::new();
205        let run_id: RunId = "r3".into();
206        events
207            .append(
208                &run_id,
209                &[
210                    Event::StateUpdated {
211                        step_id: Some("a".into()),
212                        payload: serde_json::to_value(&TestState(1)).unwrap(),
213                    },
214                    Event::StateUpdated {
215                        step_id: Some("b".into()),
216                        payload: serde_json::to_value(&TestState(2)).unwrap(),
217                    },
218                ],
219            )
220            .unwrap();
221        let cursor = ReplayCursor::<TestState> {
222            events: Box::new(events),
223            snaps: None,
224            reducer: Box::new(StateUpdatedOnlyReducer),
225        };
226        let mut iter = cursor.replay_step(&run_id, TestState(0), 1).unwrap();
227        let (s1, _) = iter.next_step().unwrap().unwrap();
228        assert_eq!(s1, TestState(1));
229        let (s2, _) = iter.next_step().unwrap().unwrap();
230        assert_eq!(s2, TestState(2));
231        assert!(iter.next_step().unwrap().is_none());
232    }
233}