1use std::time::Duration;
4
5use crate::kernel::action::{Action, ActionError, ActionExecutor, ActionResult};
6use crate::kernel::determinism_guard::DeterminismGuard;
7use crate::kernel::event::{Event, EventStore, SequencedEvent};
8use crate::kernel::identity::{RunId, Seq};
9use crate::kernel::kernel_mode::KernelMode;
10use crate::kernel::policy::{Policy, PolicyCtx, RetryDecision};
11use crate::kernel::reducer::Reducer;
12use crate::kernel::runtime_effect::{EffectSink, RuntimeEffect};
13use crate::kernel::snapshot::{Snapshot, SnapshotStore};
14use crate::kernel::state::KernelState;
15use crate::kernel::step::{InterruptInfo, Next, StepFn};
16use crate::kernel::timeline;
17use crate::kernel::KernelError;
18
19#[derive(Clone, Debug)]
21pub enum RunStatus {
22 Completed,
24 Blocked(BlockedInfo),
26 Running,
28 Failed { recoverable: bool },
30}
31
32#[derive(Clone, Debug)]
33pub struct BlockedInfo {
34 pub interrupt: Option<InterruptInfo>,
35 pub wait_signal: Option<String>,
36}
37
38#[derive(Clone, Debug)]
40pub enum Signal {
41 Resume(serde_json::Value),
42 Signal {
43 name: String,
44 value: serde_json::Value,
45 },
46}
47
48pub struct Kernel<S: KernelState> {
50 pub events: Box<dyn EventStore>,
51 pub snaps: Option<Box<dyn SnapshotStore<S>>>,
52 pub reducer: Box<dyn Reducer<S>>,
53 pub exec: Box<dyn ActionExecutor>,
54 pub step: Box<dyn StepFn<S>>,
55 pub policy: Box<dyn Policy>,
56 pub effect_sink: Option<Box<dyn EffectSink>>,
58 pub mode: KernelMode,
60}
61
62impl<S: KernelState> Kernel<S> {
63 pub fn determinism_guard(&self) -> DeterminismGuard {
65 DeterminismGuard::new(self.mode)
66 }
67
68 pub fn run_until_blocked(
71 &self,
72 run_id: &RunId,
73 initial_state: S,
74 ) -> Result<RunStatus, KernelError> {
75 self.run_loop(run_id, initial_state)
76 }
77
78 pub fn resume(
81 &self,
82 run_id: &RunId,
83 initial_state: S,
84 signal: Signal,
85 ) -> Result<RunStatus, KernelError> {
86 let value = match &signal {
87 Signal::Resume(v) => v.clone(),
88 Signal::Signal { value, .. } => value.clone(),
89 };
90 self.events.append(run_id, &[Event::Resumed { value }])?;
91 self.run_loop(run_id, initial_state)
92 }
93
94 fn run_loop(&self, run_id: &RunId, initial_state: S) -> Result<RunStatus, KernelError> {
96 let mut state = self.restore_state(run_id, initial_state)?;
97
98 loop {
99 let next = self.step.next(&state)?;
100 match next {
101 Next::Emit(evs) => {
102 if let Some(sink) = &self.effect_sink {
103 for ev in &evs {
104 if let Event::StateUpdated { step_id, payload } = ev {
105 sink.record(
106 run_id,
107 &RuntimeEffect::StateWrite {
108 step_id: step_id.clone(),
109 payload: payload.clone(),
110 },
111 );
112 }
113 }
114 }
115 if !evs.is_empty() {
116 self.append_and_apply(run_id, &mut state, &evs)?;
117 }
118 }
119 Next::Do(action) => {
120 if let Some(sink) = &self.effect_sink {
121 match &action {
122 Action::CallLLM { provider, input } => {
123 sink.record(
124 run_id,
125 &RuntimeEffect::LLMCall {
126 provider: provider.clone(),
127 input: input.clone(),
128 },
129 );
130 }
131 Action::CallTool { tool, input } => {
132 sink.record(
133 run_id,
134 &RuntimeEffect::ToolCall {
135 tool: tool.clone(),
136 input: input.clone(),
137 },
138 );
139 }
140 _ => {}
141 }
142 }
143 self.policy
144 .authorize(run_id, &action, &PolicyCtx::default())?;
145 let before = self.events.head(run_id)?;
146 let action_id = format!("{}-{}", run_id, before + 1);
147 let payload = serde_json::to_value(&action)
148 .map_err(|e| KernelError::Driver(e.to_string()))?;
149 self.append_and_apply(
150 run_id,
151 &mut state,
152 &[Event::ActionRequested {
153 action_id: action_id.clone(),
154 payload,
155 }],
156 )?;
157 let result = self.exec.execute(run_id, &action);
158 match result {
159 Ok(ActionResult::Success(output)) => {
160 self.append_and_apply(
161 run_id,
162 &mut state,
163 &[Event::ActionSucceeded {
164 action_id: action_id.clone(),
165 output,
166 }],
167 )?;
168 }
169 Ok(ActionResult::Failure(error)) => {
170 self.append_and_apply(
171 run_id,
172 &mut state,
173 &[Event::ActionFailed { action_id, error }],
174 )?;
175 return Ok(RunStatus::Failed { recoverable: false });
176 }
177 Err(mut e) => {
178 let mut attempt = 0u32;
179 loop {
180 let action_err = ActionError::from_kernel_error(&e);
181 let decision = self.policy.retry_strategy_attempt(
182 &action_err,
183 &action,
184 attempt,
185 );
186 match decision {
187 RetryDecision::Fail => {
188 self.append_and_apply(
189 run_id,
190 &mut state,
191 &[Event::ActionFailed {
192 action_id: action_id.clone(),
193 error: e.to_string(),
194 }],
195 )?;
196 return Ok(RunStatus::Failed { recoverable: false });
197 }
198 RetryDecision::Retry | RetryDecision::RetryAfterMs(0) => {}
199 RetryDecision::RetryAfterMs(ms) => {
200 std::thread::sleep(Duration::from_millis(ms));
201 }
202 }
203 attempt += 1;
204 match self.exec.execute(run_id, &action) {
205 Ok(ActionResult::Success(output)) => {
206 self.append_and_apply(
207 run_id,
208 &mut state,
209 &[Event::ActionSucceeded {
210 action_id: action_id.clone(),
211 output,
212 }],
213 )?;
214 break;
215 }
216 Ok(ActionResult::Failure(error)) => {
217 self.append_and_apply(
218 run_id,
219 &mut state,
220 &[Event::ActionFailed {
221 action_id: action_id.clone(),
222 error,
223 }],
224 )?;
225 return Ok(RunStatus::Failed { recoverable: false });
226 }
227 Err(e2) => e = e2,
228 }
229 }
230 }
231 }
232 }
233 Next::Interrupt(info) => {
234 if let Some(sink) = &self.effect_sink {
235 sink.record(
236 run_id,
237 &RuntimeEffect::InterruptRaise {
238 value: info.value.clone(),
239 },
240 );
241 }
242 self.append_and_apply(
243 run_id,
244 &mut state,
245 &[Event::Interrupted {
246 value: info.value.clone(),
247 }],
248 )?;
249 return Ok(RunStatus::Blocked(BlockedInfo {
250 interrupt: Some(info),
251 wait_signal: None,
252 }));
253 }
254 Next::Complete => {
255 self.append_and_apply(run_id, &mut state, &[Event::Completed])?;
256 return Ok(RunStatus::Completed);
257 }
258 }
259 }
260 }
261
262 fn restore_state(&self, run_id: &RunId, initial_state: S) -> Result<S, KernelError> {
263 const FROM_SEQ: Seq = 1;
264 let latest_snapshot = self.load_latest_snapshot(run_id)?;
265 let (mut state, from_seq) = match latest_snapshot {
266 Some(snapshot) => (snapshot.state, snapshot.at_seq + 1),
267 None => (initial_state, FROM_SEQ),
268 };
269 let sequenced = self.events.scan(run_id, from_seq)?;
270 self.apply_events(run_id, &mut state, sequenced)?;
271 Ok(state)
272 }
273
274 fn load_latest_snapshot(&self, run_id: &RunId) -> Result<Option<Snapshot<S>>, KernelError> {
275 match &self.snaps {
276 Some(store) => store.load_latest(run_id),
277 None => Ok(None),
278 }
279 }
280
281 fn append_and_apply(
282 &self,
283 run_id: &RunId,
284 state: &mut S,
285 events: &[Event],
286 ) -> Result<(), KernelError> {
287 if events.is_empty() {
288 return Ok(());
289 }
290 let before = self.events.head(run_id)?;
291 self.events.append(run_id, events)?;
292 let sequenced = self.events.scan(run_id, before + 1)?;
293 self.apply_events(run_id, state, sequenced)
294 }
295
296 fn apply_events(
297 &self,
298 run_id: &RunId,
299 state: &mut S,
300 sequenced: Vec<SequencedEvent>,
301 ) -> Result<(), KernelError> {
302 for se in sequenced {
303 self.reducer.apply(state, &se)?;
304 self.save_snapshot(run_id, se.seq, state)?;
305 }
306 Ok(())
307 }
308
309 fn save_snapshot(&self, run_id: &RunId, at_seq: Seq, state: &S) -> Result<(), KernelError> {
310 if let Some(store) = &self.snaps {
311 store.save(&Snapshot {
312 run_id: run_id.clone(),
313 at_seq,
314 state: state.clone(),
315 })?;
316 }
317 Ok(())
318 }
319
320 pub fn replay(&self, run_id: &RunId, initial_state: S) -> Result<S, KernelError> {
326 self.replay_from(run_id, initial_state, None)
327 }
328
329 pub fn replay_from_snapshot(&self, run_id: &RunId, initial_state: S) -> Result<S, KernelError> {
335 let from_snap = self.load_latest_snapshot(run_id)?;
336 self.replay_from(run_id, initial_state, from_snap.as_ref())
337 }
338
339 fn replay_from(
341 &self,
342 run_id: &RunId,
343 initial_state: S,
344 snapshot: Option<&Snapshot<S>>,
345 ) -> Result<S, KernelError> {
346 const FROM_SEQ: Seq = 1;
347 let (mut state, from_seq) = match snapshot {
348 Some(snap) => (snap.state.clone(), snap.at_seq + 1),
349 None => (initial_state, FROM_SEQ),
350 };
351 let sequenced = self.events.scan(run_id, from_seq)?;
352 for se in sequenced {
353 self.reducer.apply(&mut state, &se)?;
354 }
355 Ok(state)
356 }
357
358 pub fn run_timeline(&self, run_id: &RunId) -> Result<timeline::RunTimeline, KernelError> {
360 timeline::run_timeline(self.events.as_ref(), run_id)
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367 use crate::kernel::action::{Action, ActionError, ActionExecutor, ActionResult};
368 use crate::kernel::event::Event;
369 use crate::kernel::event_store::{InMemoryEventStore, SharedEventStore};
370 use crate::kernel::policy::RetryWithBackoffPolicy;
371 use crate::kernel::runtime_effect::RuntimeEffect;
372 use crate::kernel::snapshot::{InMemorySnapshotStore, SnapshotStore};
373 use crate::kernel::stubs::{AllowAllPolicy, NoopActionExecutor, NoopStepFn};
374 use crate::kernel::StateUpdatedOnlyReducer;
375 use serde::{Deserialize, Serialize};
376 use std::sync::atomic::{AtomicUsize, Ordering};
377 use std::sync::{Arc, Mutex};
378
379 #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
380 struct TestState(u32);
381 impl KernelState for TestState {
382 fn version(&self) -> u32 {
383 1
384 }
385 }
386
387 struct CountingActionExecutor(Arc<AtomicUsize>);
389 impl CountingActionExecutor {
390 fn new(counter: Arc<AtomicUsize>) -> Self {
391 Self(counter)
392 }
393 fn count(&self) -> usize {
394 self.0.load(Ordering::SeqCst)
395 }
396 }
397 impl ActionExecutor for CountingActionExecutor {
398 fn execute(&self, _run_id: &RunId, _action: &Action) -> Result<ActionResult, KernelError> {
399 self.0.fetch_add(1, Ordering::SeqCst);
400 Err(KernelError::Driver("mock".into()))
401 }
402 }
403
404 struct SharedSnapshotStoreHandle<S>(Arc<InMemorySnapshotStore<S>>);
405 impl<S: Clone + Send + Sync> SnapshotStore<S> for SharedSnapshotStoreHandle<S> {
406 fn load_latest(&self, run_id: &RunId) -> Result<Option<Snapshot<S>>, KernelError> {
407 self.0.load_latest(run_id)
408 }
409
410 fn save(&self, snapshot: &Snapshot<S>) -> Result<(), KernelError> {
411 self.0.save(snapshot)
412 }
413 }
414
415 struct CountingStateReducer(Arc<AtomicUsize>);
416 impl Reducer<TestState> for CountingStateReducer {
417 fn apply(&self, state: &mut TestState, event: &SequencedEvent) -> Result<(), KernelError> {
418 self.0.fetch_add(1, Ordering::SeqCst);
419 if let Event::StateUpdated { payload, .. } = &event.event {
420 *state = serde_json::from_value(payload.clone())
421 .map_err(|e| KernelError::Reducer(e.to_string()))?;
422 }
423 Ok(())
424 }
425 }
426
427 struct CollectingEffectSink(Arc<Mutex<Vec<(String, RuntimeEffect)>>>);
429 impl crate::kernel::runtime_effect::EffectSink for CollectingEffectSink {
430 fn record(&self, run_id: &RunId, effect: &RuntimeEffect) {
431 self.0
432 .lock()
433 .unwrap()
434 .push((run_id.clone(), effect.clone()));
435 }
436 }
437
438 struct EmitOnceThenCompleteStep(AtomicUsize);
440 impl StepFn<TestState> for EmitOnceThenCompleteStep {
441 fn next(&self, _state: &TestState) -> Result<Next, KernelError> {
442 if self.0.fetch_add(1, Ordering::SeqCst) == 0 {
443 Ok(Next::Emit(vec![Event::StateUpdated {
444 step_id: Some("node1".into()),
445 payload: serde_json::to_value(&TestState(1)).unwrap(),
446 }]))
447 } else {
448 Ok(Next::Complete)
449 }
450 }
451 }
452
453 #[test]
454 fn run_until_blocked_complete() {
455 let k = Kernel::<TestState> {
456 events: Box::new(InMemoryEventStore::new()),
457 snaps: None,
458 reducer: Box::new(StateUpdatedOnlyReducer),
459 exec: Box::new(NoopActionExecutor),
460 step: Box::new(NoopStepFn),
461 policy: Box::new(AllowAllPolicy),
462 effect_sink: None,
463 mode: KernelMode::Normal,
464 };
465 let run_id = "run-complete".to_string();
466 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
467 assert!(matches!(status, RunStatus::Completed));
468 }
469
470 #[test]
471 fn run_until_blocked_persists_latest_snapshot_on_completion() {
472 let snapshots = Arc::new(InMemorySnapshotStore::new());
473 let k = Kernel::<TestState> {
474 events: Box::new(InMemoryEventStore::new()),
475 snaps: Some(Box::new(SharedSnapshotStoreHandle(Arc::clone(&snapshots)))),
476 reducer: Box::new(StateUpdatedOnlyReducer),
477 exec: Box::new(NoopActionExecutor),
478 step: Box::new(EmitOnceThenCompleteStep(AtomicUsize::new(0))),
479 policy: Box::new(AllowAllPolicy),
480 effect_sink: None,
481 mode: KernelMode::Normal,
482 };
483 let run_id = "run-snapshot-complete".to_string();
484 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
485 assert!(matches!(status, RunStatus::Completed));
486
487 let snapshot = snapshots.load_latest(&run_id).unwrap().unwrap();
488 assert_eq!(
489 snapshot.at_seq, 2,
490 "StateUpdated + Completed should both checkpoint"
491 );
492 assert_eq!(snapshot.state, TestState(1));
493 }
494
495 #[test]
496 fn kernel_replay_mode_determinism_guard_traps_clock() {
497 let k = Kernel::<TestState> {
498 events: Box::new(InMemoryEventStore::new()),
499 snaps: None,
500 reducer: Box::new(StateUpdatedOnlyReducer),
501 exec: Box::new(NoopActionExecutor),
502 step: Box::new(NoopStepFn),
503 policy: Box::new(AllowAllPolicy),
504 effect_sink: None,
505 mode: KernelMode::Replay,
506 };
507 let guard = k.determinism_guard();
508 let err = guard.check_clock_access().unwrap_err();
509 assert!(err.to_string().contains("Replay"));
510 }
511
512 #[test]
513 fn effect_sink_captures_state_write_and_complete() {
514 let effects: Arc<Mutex<Vec<(String, RuntimeEffect)>>> = Arc::new(Mutex::new(Vec::new()));
515 let sink = CollectingEffectSink(Arc::clone(&effects));
516 let k = Kernel::<TestState> {
517 events: Box::new(InMemoryEventStore::new()),
518 snaps: None,
519 reducer: Box::new(StateUpdatedOnlyReducer),
520 exec: Box::new(NoopActionExecutor),
521 step: Box::new(EmitOnceThenCompleteStep(AtomicUsize::new(0))),
522 policy: Box::new(AllowAllPolicy),
523 effect_sink: Some(Box::new(sink)),
524 mode: KernelMode::Normal,
525 };
526 let run_id = "run-effect-capture".to_string();
527 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
528 assert!(matches!(status, RunStatus::Completed));
529 let recorded = effects.lock().unwrap();
530 assert_eq!(
531 recorded.len(),
532 1,
533 "one effect (StateWrite) should be captured"
534 );
535 match &recorded[0].1 {
536 RuntimeEffect::StateWrite { step_id, payload } => {
537 assert_eq!(step_id.as_deref(), Some("node1"));
538 let s: TestState = serde_json::from_value(payload.clone()).unwrap();
539 assert_eq!(s.0, 1);
540 }
541 _ => panic!("expected StateWrite"),
542 }
543 }
544
545 #[test]
546 fn run_timeline_after_complete_has_events_and_json() {
547 let store = InMemoryEventStore::new();
548 let k = Kernel::<TestState> {
549 events: Box::new(store),
550 snaps: None,
551 reducer: Box::new(StateUpdatedOnlyReducer),
552 exec: Box::new(NoopActionExecutor),
553 step: Box::new(NoopStepFn),
554 policy: Box::new(AllowAllPolicy),
555 effect_sink: None,
556 mode: KernelMode::Normal,
557 };
558 let run_id = "timeline-run".to_string();
559 let _ = k.run_until_blocked(&run_id, TestState(0)).unwrap();
560 let tl = k.run_timeline(&run_id).unwrap();
561 assert_eq!(tl.run_id, run_id);
562 let kinds: Vec<&str> = tl.events.iter().map(|e| e.kind.as_str()).collect();
563 assert!(
564 kinds.contains(&"Completed"),
565 "timeline should contain Completed"
566 );
567 let json = serde_json::to_string(&tl).unwrap();
568 assert!(json.contains("Completed"));
569 }
570
571 struct InterruptOnceStep(bool);
572 impl StepFn<TestState> for InterruptOnceStep {
573 fn next(&self, _state: &TestState) -> Result<Next, KernelError> {
574 if !self.0 {
575 Ok(Next::Interrupt(InterruptInfo {
576 value: serde_json::json!("pause"),
577 }))
578 } else {
579 Ok(Next::Complete)
580 }
581 }
582 }
583
584 #[test]
585 fn run_until_blocked_then_resume() {
586 let inner = Arc::new(InMemoryEventStore::new());
587 let events = Box::new(SharedEventStore(inner.clone()));
588 let run_id = "run-interrupt-resume".to_string();
589 let k = Kernel::<TestState> {
590 events,
591 snaps: None,
592 reducer: Box::new(StateUpdatedOnlyReducer),
593 exec: Box::new(NoopActionExecutor),
594 step: Box::new(InterruptOnceStep(false)),
595 policy: Box::new(AllowAllPolicy),
596 effect_sink: None,
597 mode: KernelMode::Normal,
598 };
599 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
600 assert!(matches!(status, RunStatus::Blocked(_)));
601
602 let k2 = Kernel::<TestState> {
603 events: Box::new(SharedEventStore(inner)),
604 snaps: None,
605 reducer: Box::new(StateUpdatedOnlyReducer),
606 exec: Box::new(NoopActionExecutor),
607 step: Box::new(InterruptOnceStep(true)),
608 policy: Box::new(AllowAllPolicy),
609 effect_sink: None,
610 mode: KernelMode::Normal,
611 };
612 let status2 = k2
613 .resume(&run_id, TestState(0), Signal::Resume(serde_json::json!(1)))
614 .unwrap();
615 assert!(matches!(status2, RunStatus::Completed));
616 }
617
618 #[test]
619 fn interrupt_saves_snapshot_before_returning_blocked() {
620 let snapshots = Arc::new(InMemorySnapshotStore::new());
621 let k = Kernel::<TestState> {
622 events: Box::new(InMemoryEventStore::new()),
623 snaps: Some(Box::new(SharedSnapshotStoreHandle(Arc::clone(&snapshots)))),
624 reducer: Box::new(StateUpdatedOnlyReducer),
625 exec: Box::new(NoopActionExecutor),
626 step: Box::new(InterruptOnceStep(false)),
627 policy: Box::new(AllowAllPolicy),
628 effect_sink: None,
629 mode: KernelMode::Normal,
630 };
631 let run_id = "run-interrupt-checkpoint".to_string();
632
633 let status = k.run_until_blocked(&run_id, TestState(7)).unwrap();
634 assert!(matches!(status, RunStatus::Blocked(_)));
635
636 let snapshot = snapshots.load_latest(&run_id).unwrap().unwrap();
637 assert_eq!(snapshot.at_seq, 1);
638 assert_eq!(snapshot.state, TestState(7));
639 }
640
641 #[test]
643 fn replay_no_side_effects() {
644 let exec_count = Arc::new(AtomicUsize::new(0));
645 let store = InMemoryEventStore::new();
646 let run_id = "replay-no-effects".to_string();
647 store
648 .append(
649 &run_id,
650 &[
651 Event::StateUpdated {
652 step_id: Some("n1".into()),
653 payload: serde_json::json!(42),
654 },
655 Event::Completed,
656 ],
657 )
658 .unwrap();
659 let k = Kernel::<TestState> {
660 events: Box::new(store),
661 snaps: None,
662 reducer: Box::new(StateUpdatedOnlyReducer),
663 exec: Box::new(CountingActionExecutor::new(Arc::clone(&exec_count))),
664 step: Box::new(NoopStepFn),
665 policy: Box::new(AllowAllPolicy),
666 effect_sink: None,
667 mode: KernelMode::Normal,
668 };
669 let _ = k.replay(&run_id, TestState(0)).unwrap();
670 assert_eq!(
671 exec_count.load(Ordering::SeqCst),
672 0,
673 "replay must not call executor"
674 );
675 }
676
677 #[test]
679 fn replay_state_equivalence() {
680 let store = InMemoryEventStore::new();
681 let run_id = "replay-equiv".to_string();
682 store
683 .append(
684 &run_id,
685 &[
686 Event::StateUpdated {
687 step_id: Some("a".into()),
688 payload: serde_json::json!(10),
689 },
690 Event::StateUpdated {
691 step_id: Some("b".into()),
692 payload: serde_json::json!(20),
693 },
694 Event::Completed,
695 ],
696 )
697 .unwrap();
698 let k = Kernel::<TestState> {
699 events: Box::new(SharedEventStore(Arc::new(store))),
700 snaps: None,
701 reducer: Box::new(StateUpdatedOnlyReducer),
702 exec: Box::new(NoopActionExecutor),
703 step: Box::new(NoopStepFn),
704 policy: Box::new(AllowAllPolicy),
705 effect_sink: None,
706 mode: KernelMode::Normal,
707 };
708 let initial = TestState(0);
709 let s1 = k.replay(&run_id, initial.clone()).unwrap();
710 let s2 = k.replay(&run_id, initial).unwrap();
711 assert_eq!(s1, s2, "same log and initial state must yield equal state");
712 assert_eq!(s1.0, 20);
713 }
714
715 struct TransientThenSuccessExecutor {
717 fail_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
718 fail_up_to: usize,
719 }
720 impl TransientThenSuccessExecutor {
721 fn new(fail_up_to: usize) -> Self {
722 Self {
723 fail_count: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)),
724 fail_up_to,
725 }
726 }
727 }
728 impl ActionExecutor for TransientThenSuccessExecutor {
729 fn execute(&self, _run_id: &RunId, _action: &Action) -> Result<ActionResult, KernelError> {
730 let n = self
731 .fail_count
732 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
733 if n < self.fail_up_to {
734 Err(KernelError::Executor(ActionError::transient("transient")))
735 } else {
736 Ok(ActionResult::Success(serde_json::json!("ok")))
737 }
738 }
739 }
740
741 #[test]
742 fn run_until_blocked_retry_then_success() {
743 use crate::kernel::policy::RetryWithBackoffPolicy;
744 let store = Arc::new(InMemoryEventStore::new());
745 let run_id = "run-retry-ok".to_string();
746 let k = Kernel::<TestState> {
747 events: Box::new(SharedEventStore(Arc::clone(&store))),
748 snaps: None,
749 reducer: Box::new(StateUpdatedOnlyReducer),
750 exec: Box::new(TransientThenSuccessExecutor::new(2)),
751 step: Box::new(DoOnceThenCompleteStep::new()),
752 policy: Box::new(RetryWithBackoffPolicy::new(AllowAllPolicy, 3, 0)),
753 effect_sink: None,
754 mode: KernelMode::Normal,
755 };
756 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
757 assert!(matches!(status, RunStatus::Completed));
758 let events = store.scan(&run_id, 1).unwrap();
759 let succeeded = events
760 .iter()
761 .filter(|e| matches!(e.event, Event::ActionSucceeded { .. }))
762 .count();
763 let failed = events
764 .iter()
765 .filter(|e| matches!(e.event, Event::ActionFailed { .. }))
766 .count();
767 assert_eq!(succeeded, 1, "exactly one ActionSucceeded after retries");
768 assert_eq!(failed, 0, "no ActionFailed when retries eventually succeed");
769 }
770
771 #[test]
773 fn run_until_blocked_failure_recovery() {
774 let store = Arc::new(InMemoryEventStore::new());
775 let run_id = "run-fail".to_string();
776 let k = Kernel::<TestState> {
777 events: Box::new(SharedEventStore(Arc::clone(&store))),
778 snaps: None,
779 reducer: Box::new(StateUpdatedOnlyReducer),
780 exec: Box::new(CountingActionExecutor::new(Arc::new(AtomicUsize::new(0)))),
781 step: Box::new(DoOnceThenCompleteStep::new()),
782 policy: Box::new(AllowAllPolicy),
783 effect_sink: None,
784 mode: KernelMode::Normal,
785 };
786 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
787 assert!(
788 matches!(status, RunStatus::Failed { recoverable } if !recoverable),
789 "default policy says Fail so recoverable should be false"
790 );
791 let events = store.scan(&run_id, 1).unwrap();
792 let has_requested = events
793 .iter()
794 .any(|e| matches!(e.event, Event::ActionRequested { .. }));
795 let has_failed = events
796 .iter()
797 .any(|e| matches!(e.event, Event::ActionFailed { .. }));
798 assert!(
799 has_requested && has_failed,
800 "event log must contain ActionRequested and ActionFailed for the same action"
801 );
802 }
803
804 struct DoOnceThenCompleteStep {
806 called: std::sync::Arc<std::sync::atomic::AtomicUsize>,
807 }
808 impl DoOnceThenCompleteStep {
809 fn new() -> Self {
810 Self {
811 called: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)),
812 }
813 }
814 }
815 impl StepFn<TestState> for DoOnceThenCompleteStep {
816 fn next(&self, _state: &TestState) -> Result<Next, KernelError> {
817 let n = self
818 .called
819 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
820 if n == 0 {
821 Ok(Next::Do(Action::CallTool {
822 tool: "dummy".into(),
823 input: serde_json::json!(null),
824 }))
825 } else {
826 Ok(Next::Complete)
827 }
828 }
829 }
830
831 struct DoThenCompleteStep {
833 called: AtomicUsize,
834 }
835 impl DoThenCompleteStep {
836 fn new() -> Self {
837 Self {
838 called: AtomicUsize::new(0),
839 }
840 }
841 }
842 impl StepFn<TestState> for DoThenCompleteStep {
843 fn next(&self, _state: &TestState) -> Result<Next, KernelError> {
844 let prev = self.called.fetch_add(1, Ordering::SeqCst);
845 if prev == 0 {
846 Ok(Next::Do(Action::CallTool {
847 tool: "dummy".into(),
848 input: serde_json::json!(null),
849 }))
850 } else {
851 Ok(Next::Complete)
852 }
853 }
854 }
855
856 struct ScriptedActionExecutor {
858 responses: Mutex<Vec<Result<ActionResult, KernelError>>>,
859 calls: AtomicUsize,
860 }
861 impl ScriptedActionExecutor {
862 fn new(responses: Vec<Result<ActionResult, KernelError>>) -> Self {
863 Self {
864 responses: Mutex::new(responses),
865 calls: AtomicUsize::new(0),
866 }
867 }
868 fn calls(&self) -> usize {
869 self.calls.load(Ordering::SeqCst)
870 }
871 }
872 impl ActionExecutor for ScriptedActionExecutor {
873 fn execute(&self, _run_id: &RunId, _action: &Action) -> Result<ActionResult, KernelError> {
874 self.calls.fetch_add(1, Ordering::SeqCst);
875 let mut guard = self.responses.lock().unwrap();
876 if guard.is_empty() {
877 return Err(KernelError::Driver("missing scripted response".into()));
878 }
879 guard.remove(0)
880 }
881 }
882
883 #[test]
885 fn replay_from_snapshot_applies_tail_only() {
886 use crate::kernel::Snapshot;
887
888 let store = InMemoryEventStore::new();
889 let run_id = "replay-snap".to_string();
890 store
891 .append(
892 &run_id,
893 &[
894 Event::StateUpdated {
895 step_id: Some("a".into()),
896 payload: serde_json::json!(10),
897 },
898 Event::StateUpdated {
899 step_id: Some("b".into()),
900 payload: serde_json::json!(20),
901 },
902 Event::StateUpdated {
903 step_id: Some("c".into()),
904 payload: serde_json::json!(30),
905 },
906 Event::Completed,
907 ],
908 )
909 .unwrap();
910 let snaps = InMemorySnapshotStore::new();
911 snaps
912 .save(&Snapshot {
913 run_id: run_id.clone(),
914 at_seq: 2,
915 state: TestState(20),
916 })
917 .unwrap();
918 let k = Kernel::<TestState> {
919 events: Box::new(store),
920 snaps: Some(Box::new(snaps)),
921 reducer: Box::new(StateUpdatedOnlyReducer),
922 exec: Box::new(NoopActionExecutor),
923 step: Box::new(NoopStepFn),
924 policy: Box::new(AllowAllPolicy),
925 effect_sink: None,
926 mode: KernelMode::Normal,
927 };
928 let state = k.replay_from_snapshot(&run_id, TestState(0)).unwrap();
929 assert_eq!(state.0, 30, "only events after at_seq=2 (seq 3) applied");
930 }
931
932 #[test]
933 fn run_loop_replays_only_tail_after_loading_latest_snapshot() {
934 let store = InMemoryEventStore::new();
935 let run_id = "run-tail-only".to_string();
936 store
937 .append(
938 &run_id,
939 &[
940 Event::StateUpdated {
941 step_id: Some("a".into()),
942 payload: serde_json::json!(1),
943 },
944 Event::StateUpdated {
945 step_id: Some("b".into()),
946 payload: serde_json::json!(2),
947 },
948 Event::StateUpdated {
949 step_id: Some("c".into()),
950 payload: serde_json::json!(3),
951 },
952 ],
953 )
954 .unwrap();
955
956 let snapshots = Arc::new(InMemorySnapshotStore::new());
957 snapshots
958 .save(&Snapshot {
959 run_id: run_id.clone(),
960 at_seq: 2,
961 state: TestState(2),
962 })
963 .unwrap();
964 let apply_count = Arc::new(AtomicUsize::new(0));
965 let k = Kernel::<TestState> {
966 events: Box::new(store),
967 snaps: Some(Box::new(SharedSnapshotStoreHandle(Arc::clone(&snapshots)))),
968 reducer: Box::new(CountingStateReducer(Arc::clone(&apply_count))),
969 exec: Box::new(NoopActionExecutor),
970 step: Box::new(NoopStepFn),
971 policy: Box::new(AllowAllPolicy),
972 effect_sink: None,
973 mode: KernelMode::Normal,
974 };
975
976 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
977 assert!(matches!(status, RunStatus::Completed));
978 assert_eq!(
979 apply_count.load(Ordering::SeqCst),
980 2,
981 "only the tail event plus the new Completed event should be reduced"
982 );
983
984 let snapshot = snapshots.load_latest(&run_id).unwrap().unwrap();
985 assert_eq!(snapshot.at_seq, 4);
986 assert_eq!(snapshot.state, TestState(3));
987 }
988
989 #[test]
990 fn action_result_failure_returns_failed_and_single_terminal_event() {
991 let store = Arc::new(InMemoryEventStore::new());
992 let run_id = "run-action-failure".to_string();
993 let k = Kernel::<TestState> {
994 events: Box::new(SharedEventStore(Arc::clone(&store))),
995 snaps: None,
996 reducer: Box::new(StateUpdatedOnlyReducer),
997 exec: Box::new(ScriptedActionExecutor::new(vec![Ok(
998 ActionResult::Failure("boom".into()),
999 )])),
1000 step: Box::new(DoThenCompleteStep::new()),
1001 policy: Box::new(AllowAllPolicy),
1002 effect_sink: None,
1003 mode: KernelMode::Normal,
1004 };
1005
1006 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
1007 assert!(
1008 matches!(status, RunStatus::Failed { recoverable } if !recoverable),
1009 "ActionResult::Failure must fail the run"
1010 );
1011
1012 let events = store.scan(&run_id, 1).unwrap();
1013 let mut requested_id: Option<String> = None;
1014 let mut success_count = 0usize;
1015 let mut failed_count = 0usize;
1016 for e in &events {
1017 match &e.event {
1018 Event::ActionRequested { action_id, .. } => requested_id = Some(action_id.clone()),
1019 Event::ActionSucceeded { .. } => success_count += 1,
1020 Event::ActionFailed { action_id, .. } => {
1021 failed_count += 1;
1022 assert_eq!(
1023 requested_id.as_deref(),
1024 Some(action_id.as_str()),
1025 "failed event must match requested action_id"
1026 );
1027 }
1028 _ => {}
1029 }
1030 }
1031 assert_eq!(success_count, 0);
1032 assert_eq!(
1033 failed_count, 1,
1034 "only one terminal failure event is expected"
1035 );
1036 }
1037
1038 #[test]
1039 fn retry_then_success_has_single_terminal_success_event() {
1040 let store = Arc::new(InMemoryEventStore::new());
1041 let run_id = "run-retry-success".to_string();
1042 let exec = Arc::new(ScriptedActionExecutor::new(vec![
1043 Err(KernelError::Executor(ActionError::transient("transient-1"))),
1044 Err(KernelError::Executor(ActionError::transient("transient-2"))),
1045 Ok(ActionResult::Success(serde_json::json!("ok"))),
1046 ]));
1047 let k = Kernel::<TestState> {
1048 events: Box::new(SharedEventStore(Arc::clone(&store))),
1049 snaps: None,
1050 reducer: Box::new(StateUpdatedOnlyReducer),
1051 exec: Box::new(ArcExecutor(Arc::clone(&exec))),
1052 step: Box::new(DoThenCompleteStep::new()),
1053 policy: Box::new(RetryWithBackoffPolicy::new(AllowAllPolicy, 3, 0)),
1054 effect_sink: None,
1055 mode: KernelMode::Normal,
1056 };
1057
1058 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
1059 assert!(matches!(status, RunStatus::Completed));
1060 assert_eq!(exec.calls(), 3, "executor should be called for retries");
1061
1062 let events = store.scan(&run_id, 1).unwrap();
1063 let requested_count = events
1064 .iter()
1065 .filter(|e| matches!(e.event, Event::ActionRequested { .. }))
1066 .count();
1067 let success_count = events
1068 .iter()
1069 .filter(|e| matches!(e.event, Event::ActionSucceeded { .. }))
1070 .count();
1071 let failed_count = events
1072 .iter()
1073 .filter(|e| matches!(e.event, Event::ActionFailed { .. }))
1074 .count();
1075 assert_eq!(
1076 requested_count, 1,
1077 "retry must not duplicate ActionRequested"
1078 );
1079 assert_eq!(
1080 success_count, 1,
1081 "exactly one terminal success event expected"
1082 );
1083 assert_eq!(failed_count, 0, "success path must not emit ActionFailed");
1084 }
1085
1086 #[test]
1087 fn retry_exhausted_has_single_terminal_failed_event() {
1088 let store = Arc::new(InMemoryEventStore::new());
1089 let run_id = "run-retry-fail".to_string();
1090 let exec = Arc::new(ScriptedActionExecutor::new(vec![
1091 Err(KernelError::Executor(ActionError::transient("transient-1"))),
1092 Err(KernelError::Executor(ActionError::transient("transient-2"))),
1093 Err(KernelError::Executor(ActionError::transient("transient-3"))),
1094 ]));
1095 let k = Kernel::<TestState> {
1096 events: Box::new(SharedEventStore(Arc::clone(&store))),
1097 snaps: None,
1098 reducer: Box::new(StateUpdatedOnlyReducer),
1099 exec: Box::new(ArcExecutor(Arc::clone(&exec))),
1100 step: Box::new(DoThenCompleteStep::new()),
1101 policy: Box::new(RetryWithBackoffPolicy::new(AllowAllPolicy, 1, 0)),
1102 effect_sink: None,
1103 mode: KernelMode::Normal,
1104 };
1105
1106 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
1107 assert!(
1108 matches!(status, RunStatus::Failed { recoverable } if !recoverable),
1109 "exhausted retries should fail run"
1110 );
1111 assert_eq!(exec.calls(), 2, "max_retries=1 should execute twice");
1112
1113 let events = store.scan(&run_id, 1).unwrap();
1114 let requested_count = events
1115 .iter()
1116 .filter(|e| matches!(e.event, Event::ActionRequested { .. }))
1117 .count();
1118 let success_count = events
1119 .iter()
1120 .filter(|e| matches!(e.event, Event::ActionSucceeded { .. }))
1121 .count();
1122 let failed_count = events
1123 .iter()
1124 .filter(|e| matches!(e.event, Event::ActionFailed { .. }))
1125 .count();
1126 assert_eq!(
1127 requested_count, 1,
1128 "retry must not duplicate ActionRequested"
1129 );
1130 assert_eq!(success_count, 0);
1131 assert_eq!(
1132 failed_count, 1,
1133 "exactly one terminal failed event expected"
1134 );
1135 }
1136
1137 struct ArcExecutor(Arc<ScriptedActionExecutor>);
1138 impl ActionExecutor for ArcExecutor {
1139 fn execute(&self, run_id: &RunId, action: &Action) -> Result<ActionResult, KernelError> {
1140 self.0.execute(run_id, action)
1141 }
1142 }
1143}