1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
//! Replay-based resume: guarantee idempotent resume semantics.
//!
//! This module provides [ReplayResume] which enforces that resuming a suspended state
//! is strictly Replay + Inject Decision, ensuring no reliance on active memory.
use serde::{Deserialize, Serialize};
use crate::kernel::event::{Event, SequencedEvent};
use crate::kernel::identity::{RunId, Seq};
use crate::kernel::reducer::Reducer;
use crate::kernel::state::KernelState;
use crate::kernel::EventStore;
use crate::kernel::KernelError;
/// Resume decision: the value injected when resuming from an interrupt.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ResumeDecision {
/// The resume value (e.g. user input, approved tool result).
pub value: serde_json::Value,
/// Optional metadata about the decision.
pub metadata: Option<serde_json::Value>,
}
impl ResumeDecision {
pub fn new(value: serde_json::Value) -> Self {
Self {
value,
metadata: None,
}
}
pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
self.metadata = Some(metadata);
self
}
}
/// Result of a replay resume operation.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ResumeResult<S: KernelState> {
/// The final state after replay + inject decision.
pub state: S,
/// Number of events replayed.
pub events_replayed: usize,
/// Whether this was an idempotent resume (already at the same point).
pub idempotent: bool,
}
/// Replay-based resume: enforces Replay + Inject Decision semantics.
pub struct ReplayResume<S: KernelState> {
/// Event store (source of truth).
pub events: Box<dyn EventStore>,
/// Reducer to apply events.
pub reducer: Box<dyn Reducer<S>>,
}
impl<S: KernelState + PartialEq> ReplayResume<S> {
/// Resumes execution by replaying from the event log and injecting the decision.
///
/// This ensures no reliance on active memory - the state is reconstructed purely
/// from the event log plus the injected decision.
pub fn resume(
&self,
run_id: &RunId,
decision: ResumeDecision,
initial_state: S,
) -> Result<ResumeResult<S>, KernelError> {
// 1. Get all events
let events = self.events.scan(run_id, 1)?;
let mut state = initial_state;
// 2. Check if already resumed (idempotent check) - must be done BEFORE injecting
let already_resumed = events
.last()
.map(|se| matches!(se.event, Event::Resumed { .. }))
.unwrap_or(false);
// 3. Replay all events that are not Resumed (skip existing Resumed events at end)
let replay_events: Vec<_> = events
.iter()
.filter(|se| !matches!(se.event, Event::Resumed { .. }))
.cloned()
.collect();
let mut events_replayed = 0;
for se in &replay_events {
self.reducer.apply(&mut state, &se)?;
events_replayed += 1;
}
// 4. If not already resumed, inject the decision
if !already_resumed {
let resume_seq = (replay_events.len() + 1) as Seq;
let resume_event = SequencedEvent {
seq: resume_seq,
event: Event::Resumed {
value: decision.value,
},
};
self.reducer.apply(&mut state, &resume_event)?;
events_replayed += 1;
}
Ok(ResumeResult {
state,
events_replayed,
idempotent: already_resumed,
})
}
/// Verifies that resuming N times yields identical results (idempotent).
pub fn verify_idempotent(
&self,
run_id: &RunId,
decision: ResumeDecision,
initial_state: S,
) -> Result<bool, KernelError> {
let result1 = self.resume(run_id, decision.clone(), initial_state.clone())?;
let result2 = self.resume(run_id, decision, initial_state)?;
// Both should have replayed the same number of events
if result1.events_replayed != result2.events_replayed {
return Ok(false);
}
// States should be identical
Ok(result1.state == result2.state)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kernel::event_store::InMemoryEventStore;
use crate::kernel::StateUpdatedOnlyReducer;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
struct TestState(u32);
impl crate::kernel::state::KernelState for TestState {
fn version(&self) -> u32 {
1
}
}
#[test]
fn resume_injects_decision() {
let events = InMemoryEventStore::new();
let run_id: RunId = "r1".into();
events
.append(
&run_id,
&[
Event::StateUpdated {
step_id: Some("a".into()),
payload: serde_json::to_value(&TestState(1)).unwrap(),
},
Event::Interrupted {
value: serde_json::json!({"reason": "ask"}),
},
],
)
.unwrap();
let resume = ReplayResume::<TestState> {
events: Box::new(events),
reducer: Box::new(StateUpdatedOnlyReducer),
};
let result = resume
.resume(
&run_id,
ResumeDecision::new(serde_json::json!("user input")),
TestState(0),
)
.unwrap();
assert_eq!(result.events_replayed, 3); // 2 original + 1 injected Resumed
assert!(!result.idempotent);
}
#[test]
fn resume_idempotent_twice() {
// This test verifies the idempotent flag is set correctly when resuming
// Note: In a real system, the event store would be updated after resume.
// Here we just verify the resume logic works.
let events = InMemoryEventStore::new();
let run_id: RunId = "r2".into();
events
.append(
&run_id,
&[
Event::StateUpdated {
step_id: Some("a".into()),
payload: serde_json::to_value(&TestState(1)).unwrap(),
},
Event::Interrupted {
value: serde_json::json!({}),
},
],
)
.unwrap();
let resume = ReplayResume::<TestState> {
events: Box::new(events),
reducer: Box::new(StateUpdatedOnlyReducer),
};
// First resume - should not be idempotent
let result1 = resume
.resume(
&run_id,
ResumeDecision::new(serde_json::json!("first")),
TestState(0),
)
.unwrap();
// The idempotent flag checks if there's ALREADY a Resumed in the store
// Since we don't write back, it will still be false
// This is expected behavior for in-memory testing
assert!(!result1.idempotent);
}
#[test]
fn verify_idempotent_returns_true() {
let events = InMemoryEventStore::new();
let run_id: RunId = "r3".into();
events
.append(
&run_id,
&[
Event::StateUpdated {
step_id: Some("a".into()),
payload: serde_json::to_value(&TestState(5)).unwrap(),
},
Event::Interrupted {
value: serde_json::json!({}),
},
],
)
.unwrap();
let resume = ReplayResume::<TestState> {
events: Box::new(events),
reducer: Box::new(StateUpdatedOnlyReducer),
};
let is_idempotent = resume
.verify_idempotent(
&run_id,
ResumeDecision::new(serde_json::json!("test")),
TestState(0),
)
.unwrap();
assert!(is_idempotent);
}
}