1use serde::{Deserialize, Serialize};
7
8use crate::kernel::event::{Event, SequencedEvent};
9use crate::kernel::identity::{RunId, Seq};
10use crate::kernel::reducer::Reducer;
11use crate::kernel::state::KernelState;
12use crate::kernel::EventStore;
13use crate::kernel::KernelError;
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
17pub struct ResumeDecision {
18 pub value: serde_json::Value,
20 pub metadata: Option<serde_json::Value>,
22}
23
24impl ResumeDecision {
25 pub fn new(value: serde_json::Value) -> Self {
26 Self {
27 value,
28 metadata: None,
29 }
30 }
31
32 pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
33 self.metadata = Some(metadata);
34 self
35 }
36}
37
38#[derive(Clone, Debug, Serialize, Deserialize)]
40pub struct ResumeResult<S: KernelState> {
41 pub state: S,
43 pub events_replayed: usize,
45 pub idempotent: bool,
47}
48
49pub struct ReplayResume<S: KernelState> {
51 pub events: Box<dyn EventStore>,
53 pub reducer: Box<dyn Reducer<S>>,
55}
56
57impl<S: KernelState + PartialEq> ReplayResume<S> {
58 pub fn resume(
63 &self,
64 run_id: &RunId,
65 decision: ResumeDecision,
66 initial_state: S,
67 ) -> Result<ResumeResult<S>, KernelError> {
68 let events = self.events.scan(run_id, 1)?;
70 let mut state = initial_state;
71
72 let already_resumed = events
74 .last()
75 .map(|se| matches!(se.event, Event::Resumed { .. }))
76 .unwrap_or(false);
77
78 let replay_events: Vec<_> = events
80 .iter()
81 .filter(|se| !matches!(se.event, Event::Resumed { .. }))
82 .cloned()
83 .collect();
84
85 let mut events_replayed = 0;
86 for se in &replay_events {
87 self.reducer.apply(&mut state, &se)?;
88 events_replayed += 1;
89 }
90
91 if !already_resumed {
93 let resume_seq = (replay_events.len() + 1) as Seq;
94 let resume_event = SequencedEvent {
95 seq: resume_seq,
96 event: Event::Resumed {
97 value: decision.value,
98 },
99 };
100 self.reducer.apply(&mut state, &resume_event)?;
101 events_replayed += 1;
102 }
103
104 Ok(ResumeResult {
105 state,
106 events_replayed,
107 idempotent: already_resumed,
108 })
109 }
110
111 pub fn verify_idempotent(
113 &self,
114 run_id: &RunId,
115 decision: ResumeDecision,
116 initial_state: S,
117 ) -> Result<bool, KernelError> {
118 let result1 = self.resume(run_id, decision.clone(), initial_state.clone())?;
119 let result2 = self.resume(run_id, decision, initial_state)?;
120
121 if result1.events_replayed != result2.events_replayed {
123 return Ok(false);
124 }
125
126 Ok(result1.state == result2.state)
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134 use crate::kernel::event_store::InMemoryEventStore;
135 use crate::kernel::StateUpdatedOnlyReducer;
136 use serde::{Deserialize, Serialize};
137
138 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
139 struct TestState(u32);
140
141 impl crate::kernel::state::KernelState for TestState {
142 fn version(&self) -> u32 {
143 1
144 }
145 }
146
147 #[test]
148 fn resume_injects_decision() {
149 let events = InMemoryEventStore::new();
150 let run_id: RunId = "r1".into();
151 events
152 .append(
153 &run_id,
154 &[
155 Event::StateUpdated {
156 step_id: Some("a".into()),
157 payload: serde_json::to_value(&TestState(1)).unwrap(),
158 },
159 Event::Interrupted {
160 value: serde_json::json!({"reason": "ask"}),
161 },
162 ],
163 )
164 .unwrap();
165
166 let resume = ReplayResume::<TestState> {
167 events: Box::new(events),
168 reducer: Box::new(StateUpdatedOnlyReducer),
169 };
170
171 let result = resume
172 .resume(
173 &run_id,
174 ResumeDecision::new(serde_json::json!("user input")),
175 TestState(0),
176 )
177 .unwrap();
178
179 assert_eq!(result.events_replayed, 3); assert!(!result.idempotent);
181 }
182
183 #[test]
184 fn resume_idempotent_twice() {
185 let events = InMemoryEventStore::new();
189 let run_id: RunId = "r2".into();
190 events
191 .append(
192 &run_id,
193 &[
194 Event::StateUpdated {
195 step_id: Some("a".into()),
196 payload: serde_json::to_value(&TestState(1)).unwrap(),
197 },
198 Event::Interrupted {
199 value: serde_json::json!({}),
200 },
201 ],
202 )
203 .unwrap();
204
205 let resume = ReplayResume::<TestState> {
206 events: Box::new(events),
207 reducer: Box::new(StateUpdatedOnlyReducer),
208 };
209
210 let result1 = resume
212 .resume(
213 &run_id,
214 ResumeDecision::new(serde_json::json!("first")),
215 TestState(0),
216 )
217 .unwrap();
218
219 assert!(!result1.idempotent);
223 }
224
225 #[test]
226 fn verify_idempotent_returns_true() {
227 let events = InMemoryEventStore::new();
228 let run_id: RunId = "r3".into();
229 events
230 .append(
231 &run_id,
232 &[
233 Event::StateUpdated {
234 step_id: Some("a".into()),
235 payload: serde_json::to_value(&TestState(5)).unwrap(),
236 },
237 Event::Interrupted {
238 value: serde_json::json!({}),
239 },
240 ],
241 )
242 .unwrap();
243
244 let resume = ReplayResume::<TestState> {
245 events: Box::new(events),
246 reducer: Box::new(StateUpdatedOnlyReducer),
247 };
248
249 let is_idempotent = resume
250 .verify_idempotent(
251 &run_id,
252 ResumeDecision::new(serde_json::json!("test")),
253 TestState(0),
254 )
255 .unwrap();
256
257 assert!(is_idempotent);
258 }
259}