Skip to main content

oris_kernel/kernel/
replay_resume.rs

1//! Replay-based resume: guarantee idempotent resume semantics.
2//!
3//! This module provides [ReplayResume] which enforces that resuming a suspended state
4//! is strictly Replay + Inject Decision, ensuring no reliance on active memory.
5
6use serde::{Deserialize, Serialize};
7
8use crate::kernel::event::{Event, SequencedEvent};
9use crate::kernel::identity::{RunId, Seq};
10use crate::kernel::reducer::Reducer;
11use crate::kernel::state::KernelState;
12use crate::kernel::EventStore;
13use crate::kernel::KernelError;
14
15/// Resume decision: the value injected when resuming from an interrupt.
16#[derive(Clone, Debug, Serialize, Deserialize)]
17pub struct ResumeDecision {
18    /// The resume value (e.g. user input, approved tool result).
19    pub value: serde_json::Value,
20    /// Optional metadata about the decision.
21    pub metadata: Option<serde_json::Value>,
22}
23
24impl ResumeDecision {
25    pub fn new(value: serde_json::Value) -> Self {
26        Self {
27            value,
28            metadata: None,
29        }
30    }
31
32    pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
33        self.metadata = Some(metadata);
34        self
35    }
36}
37
38/// Result of a replay resume operation.
39#[derive(Clone, Debug, Serialize, Deserialize)]
40pub struct ResumeResult<S: KernelState> {
41    /// The final state after replay + inject decision.
42    pub state: S,
43    /// Number of events replayed.
44    pub events_replayed: usize,
45    /// Whether this was an idempotent resume (already at the same point).
46    pub idempotent: bool,
47}
48
49/// Replay-based resume: enforces Replay + Inject Decision semantics.
50pub struct ReplayResume<S: KernelState> {
51    /// Event store (source of truth).
52    pub events: Box<dyn EventStore>,
53    /// Reducer to apply events.
54    pub reducer: Box<dyn Reducer<S>>,
55}
56
57impl<S: KernelState + PartialEq> ReplayResume<S> {
58    /// Resumes execution by replaying from the event log and injecting the decision.
59    ///
60    /// This ensures no reliance on active memory - the state is reconstructed purely
61    /// from the event log plus the injected decision.
62    pub fn resume(
63        &self,
64        run_id: &RunId,
65        decision: ResumeDecision,
66        initial_state: S,
67    ) -> Result<ResumeResult<S>, KernelError> {
68        // 1. Get all events
69        let events = self.events.scan(run_id, 1)?;
70        let mut state = initial_state;
71
72        // 2. Check if already resumed (idempotent check) - must be done BEFORE injecting
73        let already_resumed = events
74            .last()
75            .map(|se| matches!(se.event, Event::Resumed { .. }))
76            .unwrap_or(false);
77
78        // 3. Replay all events that are not Resumed (skip existing Resumed events at end)
79        let replay_events: Vec<_> = events
80            .iter()
81            .filter(|se| !matches!(se.event, Event::Resumed { .. }))
82            .cloned()
83            .collect();
84
85        let mut events_replayed = 0;
86        for se in &replay_events {
87            self.reducer.apply(&mut state, &se)?;
88            events_replayed += 1;
89        }
90
91        // 4. If not already resumed, inject the decision
92        if !already_resumed {
93            let resume_seq = (replay_events.len() + 1) as Seq;
94            let resume_event = SequencedEvent {
95                seq: resume_seq,
96                event: Event::Resumed {
97                    value: decision.value,
98                },
99            };
100            self.reducer.apply(&mut state, &resume_event)?;
101            events_replayed += 1;
102        }
103
104        Ok(ResumeResult {
105            state,
106            events_replayed,
107            idempotent: already_resumed,
108        })
109    }
110
111    /// Verifies that resuming N times yields identical results (idempotent).
112    pub fn verify_idempotent(
113        &self,
114        run_id: &RunId,
115        decision: ResumeDecision,
116        initial_state: S,
117    ) -> Result<bool, KernelError> {
118        let result1 = self.resume(run_id, decision.clone(), initial_state.clone())?;
119        let result2 = self.resume(run_id, decision, initial_state)?;
120
121        // Both should have replayed the same number of events
122        if result1.events_replayed != result2.events_replayed {
123            return Ok(false);
124        }
125
126        // States should be identical
127        Ok(result1.state == result2.state)
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134    use crate::kernel::event_store::InMemoryEventStore;
135    use crate::kernel::StateUpdatedOnlyReducer;
136    use serde::{Deserialize, Serialize};
137
138    #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
139    struct TestState(u32);
140
141    impl crate::kernel::state::KernelState for TestState {
142        fn version(&self) -> u32 {
143            1
144        }
145    }
146
147    #[test]
148    fn resume_injects_decision() {
149        let events = InMemoryEventStore::new();
150        let run_id: RunId = "r1".into();
151        events
152            .append(
153                &run_id,
154                &[
155                    Event::StateUpdated {
156                        step_id: Some("a".into()),
157                        payload: serde_json::to_value(&TestState(1)).unwrap(),
158                    },
159                    Event::Interrupted {
160                        value: serde_json::json!({"reason": "ask"}),
161                    },
162                ],
163            )
164            .unwrap();
165
166        let resume = ReplayResume::<TestState> {
167            events: Box::new(events),
168            reducer: Box::new(StateUpdatedOnlyReducer),
169        };
170
171        let result = resume
172            .resume(
173                &run_id,
174                ResumeDecision::new(serde_json::json!("user input")),
175                TestState(0),
176            )
177            .unwrap();
178
179        assert_eq!(result.events_replayed, 3); // 2 original + 1 injected Resumed
180        assert!(!result.idempotent);
181    }
182
183    #[test]
184    fn resume_idempotent_twice() {
185        // This test verifies the idempotent flag is set correctly when resuming
186        // Note: In a real system, the event store would be updated after resume.
187        // Here we just verify the resume logic works.
188        let events = InMemoryEventStore::new();
189        let run_id: RunId = "r2".into();
190        events
191            .append(
192                &run_id,
193                &[
194                    Event::StateUpdated {
195                        step_id: Some("a".into()),
196                        payload: serde_json::to_value(&TestState(1)).unwrap(),
197                    },
198                    Event::Interrupted {
199                        value: serde_json::json!({}),
200                    },
201                ],
202            )
203            .unwrap();
204
205        let resume = ReplayResume::<TestState> {
206            events: Box::new(events),
207            reducer: Box::new(StateUpdatedOnlyReducer),
208        };
209
210        // First resume - should not be idempotent
211        let result1 = resume
212            .resume(
213                &run_id,
214                ResumeDecision::new(serde_json::json!("first")),
215                TestState(0),
216            )
217            .unwrap();
218
219        // The idempotent flag checks if there's ALREADY a Resumed in the store
220        // Since we don't write back, it will still be false
221        // This is expected behavior for in-memory testing
222        assert!(!result1.idempotent);
223    }
224
225    #[test]
226    fn verify_idempotent_returns_true() {
227        let events = InMemoryEventStore::new();
228        let run_id: RunId = "r3".into();
229        events
230            .append(
231                &run_id,
232                &[
233                    Event::StateUpdated {
234                        step_id: Some("a".into()),
235                        payload: serde_json::to_value(&TestState(5)).unwrap(),
236                    },
237                    Event::Interrupted {
238                        value: serde_json::json!({}),
239                    },
240                ],
241            )
242            .unwrap();
243
244        let resume = ReplayResume::<TestState> {
245            events: Box::new(events),
246            reducer: Box::new(StateUpdatedOnlyReducer),
247        };
248
249        let is_idempotent = resume
250            .verify_idempotent(
251                &run_id,
252                ResumeDecision::new(serde_json::json!("test")),
253                TestState(0),
254            )
255            .unwrap();
256
257        assert!(is_idempotent);
258    }
259}