Skip to main content

oris_kernel/kernel/
driver.rs

1//! Kernel driver: run_until_blocked, resume, replay.
2
3use std::time::Duration;
4
5use crate::kernel::action::{Action, ActionError, ActionExecutor, ActionResult};
6use crate::kernel::determinism_guard::DeterminismGuard;
7use crate::kernel::event::{Event, EventStore};
8use crate::kernel::identity::{RunId, Seq};
9use crate::kernel::kernel_mode::KernelMode;
10use crate::kernel::policy::{Policy, PolicyCtx, RetryDecision};
11use crate::kernel::reducer::Reducer;
12use crate::kernel::runtime_effect::{EffectSink, RuntimeEffect};
13use crate::kernel::snapshot::{Snapshot, SnapshotStore};
14use crate::kernel::state::KernelState;
15use crate::kernel::step::{InterruptInfo, Next, StepFn};
16use crate::kernel::timeline;
17use crate::kernel::KernelError;
18
19/// Standardized status of a run after run_until_blocked or resume.
20#[derive(Clone, Debug)]
21pub enum RunStatus {
22    /// Run completed successfully.
23    Completed,
24    /// Run is blocked (e.g. on interrupt or WaitSignal); can be resumed.
25    Blocked(BlockedInfo),
26    /// Run is still advancing (optional; used when yielding before blocking).
27    Running,
28    /// Run failed; recoverable indicates whether resume/retry is possible.
29    Failed { recoverable: bool },
30}
31
32#[derive(Clone, Debug)]
33pub struct BlockedInfo {
34    pub interrupt: Option<InterruptInfo>,
35    pub wait_signal: Option<String>,
36}
37
38/// Signal to resume a blocked run (e.g. human approval, external event).
39#[derive(Clone, Debug)]
40pub enum Signal {
41    Resume(serde_json::Value),
42    Signal {
43        name: String,
44        value: serde_json::Value,
45    },
46}
47
48/// Kernel: event store, optional snapshot store, reducer, executor, step fn, policy, optional effect sink, execution mode.
49pub struct Kernel<S: KernelState> {
50    pub events: Box<dyn EventStore>,
51    pub snaps: Option<Box<dyn SnapshotStore<S>>>,
52    pub reducer: Box<dyn Reducer<S>>,
53    pub exec: Box<dyn ActionExecutor>,
54    pub step: Box<dyn StepFn<S>>,
55    pub policy: Box<dyn Policy>,
56    /// When set, every runtime effect (LLM/tool call, state write, interrupt) is recorded here.
57    pub effect_sink: Option<Box<dyn EffectSink>>,
58    /// Execution mode: Normal, Record, Replay, or Verify. Replay/Verify trap clock, randomness, spawn.
59    pub mode: KernelMode,
60}
61
62impl<S: KernelState> Kernel<S> {
63    /// Returns a determinism guard for the current mode. Use before clock, randomness, or spawn in Replay/Verify.
64    pub fn determinism_guard(&self) -> DeterminismGuard {
65        DeterminismGuard::new(self.mode)
66    }
67
68    /// Runs until the step returns Complete or Interrupt/WaitSignal. Returns run status.
69    /// State is obtained by replaying the event log (or initial_state if the run has no events yet).
70    pub fn run_until_blocked(
71        &self,
72        run_id: &RunId,
73        initial_state: S,
74    ) -> Result<RunStatus, KernelError> {
75        self.run_loop(run_id, initial_state)
76    }
77
78    /// Resumes a blocked run with a signal (e.g. resume value or external signal).
79    /// Appends Resumed (or Signal) event, then runs until blocked or complete.
80    pub fn resume(
81        &self,
82        run_id: &RunId,
83        initial_state: S,
84        signal: Signal,
85    ) -> Result<RunStatus, KernelError> {
86        let value = match &signal {
87            Signal::Resume(v) => v.clone(),
88            Signal::Signal { value, .. } => value.clone(),
89        };
90        self.events.append(run_id, &[Event::Resumed { value }])?;
91        self.run_loop(run_id, initial_state)
92    }
93
94    /// Inner loop: replay to get state, then step until Complete or Blocked.
95    fn run_loop(&self, run_id: &RunId, initial_state: S) -> Result<RunStatus, KernelError> {
96        const FROM_SEQ: Seq = 1;
97        let mut state = initial_state;
98        let sequenced = self.events.scan(run_id, FROM_SEQ)?;
99        for se in sequenced {
100            self.reducer.apply(&mut state, &se)?;
101        }
102
103        loop {
104            let next = self.step.next(&state)?;
105            match next {
106                Next::Emit(evs) => {
107                    if let Some(sink) = &self.effect_sink {
108                        for ev in &evs {
109                            if let Event::StateUpdated { step_id, payload } = ev {
110                                sink.record(
111                                    run_id,
112                                    &RuntimeEffect::StateWrite {
113                                        step_id: step_id.clone(),
114                                        payload: payload.clone(),
115                                    },
116                                );
117                            }
118                        }
119                    }
120                    if !evs.is_empty() {
121                        let before = self.events.head(run_id)?;
122                        self.events.append(run_id, &evs)?;
123                        let new_events = self.events.scan(run_id, before + 1)?;
124                        for se in new_events {
125                            self.reducer.apply(&mut state, &se)?;
126                        }
127                    }
128                }
129                Next::Do(action) => {
130                    if let Some(sink) = &self.effect_sink {
131                        match &action {
132                            Action::CallLLM { provider, input } => {
133                                sink.record(
134                                    run_id,
135                                    &RuntimeEffect::LLMCall {
136                                        provider: provider.clone(),
137                                        input: input.clone(),
138                                    },
139                                );
140                            }
141                            Action::CallTool { tool, input } => {
142                                sink.record(
143                                    run_id,
144                                    &RuntimeEffect::ToolCall {
145                                        tool: tool.clone(),
146                                        input: input.clone(),
147                                    },
148                                );
149                            }
150                            _ => {}
151                        }
152                    }
153                    self.policy
154                        .authorize(run_id, &action, &PolicyCtx::default())?;
155                    let before = self.events.head(run_id)?;
156                    let action_id = format!("{}-{}", run_id, before + 1);
157                    let payload = serde_json::to_value(&action)
158                        .map_err(|e| KernelError::Driver(e.to_string()))?;
159                    self.events.append(
160                        run_id,
161                        &[Event::ActionRequested {
162                            action_id: action_id.clone(),
163                            payload,
164                        }],
165                    )?;
166                    let result = self.exec.execute(run_id, &action);
167                    match result {
168                        Ok(ActionResult::Success(output)) => {
169                            self.events.append(
170                                run_id,
171                                &[Event::ActionSucceeded {
172                                    action_id: action_id.clone(),
173                                    output,
174                                }],
175                            )?;
176                        }
177                        Ok(ActionResult::Failure(error)) => {
178                            self.events
179                                .append(run_id, &[Event::ActionFailed { action_id, error }])?;
180                            return Ok(RunStatus::Failed { recoverable: false });
181                        }
182                        Err(mut e) => {
183                            let mut attempt = 0u32;
184                            loop {
185                                let action_err = ActionError::from_kernel_error(&e);
186                                let decision = self.policy.retry_strategy_attempt(
187                                    &action_err,
188                                    &action,
189                                    attempt,
190                                );
191                                match decision {
192                                    RetryDecision::Fail => {
193                                        self.events.append(
194                                            run_id,
195                                            &[Event::ActionFailed {
196                                                action_id: action_id.clone(),
197                                                error: e.to_string(),
198                                            }],
199                                        )?;
200                                        return Ok(RunStatus::Failed { recoverable: false });
201                                    }
202                                    RetryDecision::Retry | RetryDecision::RetryAfterMs(0) => {}
203                                    RetryDecision::RetryAfterMs(ms) => {
204                                        std::thread::sleep(Duration::from_millis(ms));
205                                    }
206                                }
207                                attempt += 1;
208                                match self.exec.execute(run_id, &action) {
209                                    Ok(ActionResult::Success(output)) => {
210                                        self.events.append(
211                                            run_id,
212                                            &[Event::ActionSucceeded {
213                                                action_id: action_id.clone(),
214                                                output,
215                                            }],
216                                        )?;
217                                        break;
218                                    }
219                                    Ok(ActionResult::Failure(error)) => {
220                                        self.events.append(
221                                            run_id,
222                                            &[Event::ActionFailed {
223                                                action_id: action_id.clone(),
224                                                error,
225                                            }],
226                                        )?;
227                                        return Ok(RunStatus::Failed { recoverable: false });
228                                    }
229                                    Err(e2) => e = e2,
230                                }
231                            }
232                        }
233                    }
234                    let new_events = self.events.scan(run_id, before + 1)?;
235                    for se in new_events {
236                        self.reducer.apply(&mut state, &se)?;
237                    }
238                }
239                Next::Interrupt(info) => {
240                    if let Some(sink) = &self.effect_sink {
241                        sink.record(
242                            run_id,
243                            &RuntimeEffect::InterruptRaise {
244                                value: info.value.clone(),
245                            },
246                        );
247                    }
248                    self.events.append(
249                        run_id,
250                        &[Event::Interrupted {
251                            value: info.value.clone(),
252                        }],
253                    )?;
254                    return Ok(RunStatus::Blocked(BlockedInfo {
255                        interrupt: Some(info),
256                        wait_signal: None,
257                    }));
258                }
259                Next::Complete => {
260                    self.events.append(run_id, &[Event::Completed])?;
261                    return Ok(RunStatus::Completed);
262                }
263            }
264        }
265    }
266
267    /// Replays the run from the event log without executing external actions; returns final state.
268    ///
269    /// Scans all events for the run (from seq 1), applies each with the Reducer in order.
270    /// Does not call ActionExecutor; any ActionRequested is satisfied by the following
271    /// ActionSucceeded/ActionFailed already stored in the log (reducer applies them).
272    pub fn replay(&self, run_id: &RunId, initial_state: S) -> Result<S, KernelError> {
273        self.replay_from(run_id, initial_state, None)
274    }
275
276    /// Replays from a snapshot if available, otherwise from initial state.
277    ///
278    /// If `snaps` is set and `load_latest(run_id)` returns a snapshot, state starts at
279    /// `snap.state` and only events with seq > snap.at_seq are applied; otherwise
280    /// starts at `initial_state` and replays from seq 1.
281    pub fn replay_from_snapshot(&self, run_id: &RunId, initial_state: S) -> Result<S, KernelError> {
282        let from_snap = self
283            .snaps
284            .as_ref()
285            .and_then(|s| s.load_latest(run_id).ok().flatten());
286        self.replay_from(run_id, initial_state, from_snap.as_ref())
287    }
288
289    /// Replay-only: no executor or step is called; recorded outputs are applied from the event log.
290    fn replay_from(
291        &self,
292        run_id: &RunId,
293        initial_state: S,
294        snapshot: Option<&Snapshot<S>>,
295    ) -> Result<S, KernelError> {
296        const FROM_SEQ: Seq = 1;
297        let (mut state, from_seq) = match snapshot {
298            Some(snap) => (snap.state.clone(), snap.at_seq + 1),
299            None => (initial_state, FROM_SEQ),
300        };
301        let sequenced = self.events.scan(run_id, from_seq)?;
302        for se in sequenced {
303            self.reducer.apply(&mut state, &se)?;
304        }
305        Ok(state)
306    }
307
308    /// Builds a run timeline (event list + final status) for audit/debugging. Serialize to JSON for UI/CLI.
309    pub fn run_timeline(&self, run_id: &RunId) -> Result<timeline::RunTimeline, KernelError> {
310        timeline::run_timeline(self.events.as_ref(), run_id)
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317    use crate::kernel::action::{Action, ActionError, ActionExecutor, ActionResult};
318    use crate::kernel::event::Event;
319    use crate::kernel::event_store::{InMemoryEventStore, SharedEventStore};
320    use crate::kernel::policy::RetryWithBackoffPolicy;
321    use crate::kernel::runtime_effect::RuntimeEffect;
322    use crate::kernel::stubs::{AllowAllPolicy, NoopActionExecutor, NoopStepFn};
323    use crate::kernel::StateUpdatedOnlyReducer;
324    use serde::{Deserialize, Serialize};
325    use std::sync::atomic::{AtomicUsize, Ordering};
326    use std::sync::{Arc, Mutex};
327
328    #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
329    struct TestState(u32);
330    impl KernelState for TestState {
331        fn version(&self) -> u32 {
332            1
333        }
334    }
335
336    /// Mock executor that counts execute() calls; used to assert replay does not call executor.
337    struct CountingActionExecutor(Arc<AtomicUsize>);
338    impl CountingActionExecutor {
339        fn new(counter: Arc<AtomicUsize>) -> Self {
340            Self(counter)
341        }
342        fn count(&self) -> usize {
343            self.0.load(Ordering::SeqCst)
344        }
345    }
346    impl ActionExecutor for CountingActionExecutor {
347        fn execute(&self, _run_id: &RunId, _action: &Action) -> Result<ActionResult, KernelError> {
348            self.0.fetch_add(1, Ordering::SeqCst);
349            Err(KernelError::Driver("mock".into()))
350        }
351    }
352
353    /// Sink that collects effects into a shared Vec for tests.
354    struct CollectingEffectSink(Arc<Mutex<Vec<(String, RuntimeEffect)>>>);
355    impl crate::kernel::runtime_effect::EffectSink for CollectingEffectSink {
356        fn record(&self, run_id: &RunId, effect: &RuntimeEffect) {
357            self.0
358                .lock()
359                .unwrap()
360                .push((run_id.clone(), effect.clone()));
361        }
362    }
363
364    /// Step that emits one StateUpdated then completes (for effect-capture test).
365    struct EmitOnceThenCompleteStep(AtomicUsize);
366    impl StepFn<TestState> for EmitOnceThenCompleteStep {
367        fn next(&self, _state: &TestState) -> Result<Next, KernelError> {
368            if self.0.fetch_add(1, Ordering::SeqCst) == 0 {
369                Ok(Next::Emit(vec![Event::StateUpdated {
370                    step_id: Some("node1".into()),
371                    payload: serde_json::to_value(&TestState(1)).unwrap(),
372                }]))
373            } else {
374                Ok(Next::Complete)
375            }
376        }
377    }
378
379    #[test]
380    fn run_until_blocked_complete() {
381        let k = Kernel::<TestState> {
382            events: Box::new(InMemoryEventStore::new()),
383            snaps: None,
384            reducer: Box::new(StateUpdatedOnlyReducer),
385            exec: Box::new(NoopActionExecutor),
386            step: Box::new(NoopStepFn),
387            policy: Box::new(AllowAllPolicy),
388            effect_sink: None,
389            mode: KernelMode::Normal,
390        };
391        let run_id = "run-complete".to_string();
392        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
393        assert!(matches!(status, RunStatus::Completed));
394    }
395
396    #[test]
397    fn kernel_replay_mode_determinism_guard_traps_clock() {
398        let k = Kernel::<TestState> {
399            events: Box::new(InMemoryEventStore::new()),
400            snaps: None,
401            reducer: Box::new(StateUpdatedOnlyReducer),
402            exec: Box::new(NoopActionExecutor),
403            step: Box::new(NoopStepFn),
404            policy: Box::new(AllowAllPolicy),
405            effect_sink: None,
406            mode: KernelMode::Replay,
407        };
408        let guard = k.determinism_guard();
409        let err = guard.check_clock_access().unwrap_err();
410        assert!(err.to_string().contains("Replay"));
411    }
412
413    #[test]
414    fn effect_sink_captures_state_write_and_complete() {
415        let effects: Arc<Mutex<Vec<(String, RuntimeEffect)>>> = Arc::new(Mutex::new(Vec::new()));
416        let sink = CollectingEffectSink(Arc::clone(&effects));
417        let k = Kernel::<TestState> {
418            events: Box::new(InMemoryEventStore::new()),
419            snaps: None,
420            reducer: Box::new(StateUpdatedOnlyReducer),
421            exec: Box::new(NoopActionExecutor),
422            step: Box::new(EmitOnceThenCompleteStep(AtomicUsize::new(0))),
423            policy: Box::new(AllowAllPolicy),
424            effect_sink: Some(Box::new(sink)),
425            mode: KernelMode::Normal,
426        };
427        let run_id = "run-effect-capture".to_string();
428        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
429        assert!(matches!(status, RunStatus::Completed));
430        let recorded = effects.lock().unwrap();
431        assert_eq!(
432            recorded.len(),
433            1,
434            "one effect (StateWrite) should be captured"
435        );
436        match &recorded[0].1 {
437            RuntimeEffect::StateWrite { step_id, payload } => {
438                assert_eq!(step_id.as_deref(), Some("node1"));
439                let s: TestState = serde_json::from_value(payload.clone()).unwrap();
440                assert_eq!(s.0, 1);
441            }
442            _ => panic!("expected StateWrite"),
443        }
444    }
445
446    #[test]
447    fn run_timeline_after_complete_has_events_and_json() {
448        let store = InMemoryEventStore::new();
449        let k = Kernel::<TestState> {
450            events: Box::new(store),
451            snaps: None,
452            reducer: Box::new(StateUpdatedOnlyReducer),
453            exec: Box::new(NoopActionExecutor),
454            step: Box::new(NoopStepFn),
455            policy: Box::new(AllowAllPolicy),
456            effect_sink: None,
457            mode: KernelMode::Normal,
458        };
459        let run_id = "timeline-run".to_string();
460        let _ = k.run_until_blocked(&run_id, TestState(0)).unwrap();
461        let tl = k.run_timeline(&run_id).unwrap();
462        assert_eq!(tl.run_id, run_id);
463        let kinds: Vec<&str> = tl.events.iter().map(|e| e.kind.as_str()).collect();
464        assert!(
465            kinds.contains(&"Completed"),
466            "timeline should contain Completed"
467        );
468        let json = serde_json::to_string(&tl).unwrap();
469        assert!(json.contains("Completed"));
470    }
471
472    struct InterruptOnceStep(bool);
473    impl StepFn<TestState> for InterruptOnceStep {
474        fn next(&self, _state: &TestState) -> Result<Next, KernelError> {
475            if !self.0 {
476                Ok(Next::Interrupt(InterruptInfo {
477                    value: serde_json::json!("pause"),
478                }))
479            } else {
480                Ok(Next::Complete)
481            }
482        }
483    }
484
485    #[test]
486    fn run_until_blocked_then_resume() {
487        let inner = Arc::new(InMemoryEventStore::new());
488        let events = Box::new(SharedEventStore(inner.clone()));
489        let run_id = "run-interrupt-resume".to_string();
490        let k = Kernel::<TestState> {
491            events,
492            snaps: None,
493            reducer: Box::new(StateUpdatedOnlyReducer),
494            exec: Box::new(NoopActionExecutor),
495            step: Box::new(InterruptOnceStep(false)),
496            policy: Box::new(AllowAllPolicy),
497            effect_sink: None,
498            mode: KernelMode::Normal,
499        };
500        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
501        assert!(matches!(status, RunStatus::Blocked(_)));
502
503        let k2 = Kernel::<TestState> {
504            events: Box::new(SharedEventStore(inner)),
505            snaps: None,
506            reducer: Box::new(StateUpdatedOnlyReducer),
507            exec: Box::new(NoopActionExecutor),
508            step: Box::new(InterruptOnceStep(true)),
509            policy: Box::new(AllowAllPolicy),
510            effect_sink: None,
511            mode: KernelMode::Normal,
512        };
513        let status2 = k2
514            .resume(&run_id, TestState(0), Signal::Resume(serde_json::json!(1)))
515            .unwrap();
516        assert!(matches!(status2, RunStatus::Completed));
517    }
518
519    /// Replay must not call ActionExecutor (0 side effects).
520    #[test]
521    fn replay_no_side_effects() {
522        let exec_count = Arc::new(AtomicUsize::new(0));
523        let store = InMemoryEventStore::new();
524        let run_id = "replay-no-effects".to_string();
525        store
526            .append(
527                &run_id,
528                &[
529                    Event::StateUpdated {
530                        step_id: Some("n1".into()),
531                        payload: serde_json::json!(42),
532                    },
533                    Event::Completed,
534                ],
535            )
536            .unwrap();
537        let k = Kernel::<TestState> {
538            events: Box::new(store),
539            snaps: None,
540            reducer: Box::new(StateUpdatedOnlyReducer),
541            exec: Box::new(CountingActionExecutor::new(Arc::clone(&exec_count))),
542            step: Box::new(NoopStepFn),
543            policy: Box::new(AllowAllPolicy),
544            effect_sink: None,
545            mode: KernelMode::Normal,
546        };
547        let _ = k.replay(&run_id, TestState(0)).unwrap();
548        assert_eq!(
549            exec_count.load(Ordering::SeqCst),
550            0,
551            "replay must not call executor"
552        );
553    }
554
555    /// Same event log and initial state must yield identical state on multiple replays.
556    #[test]
557    fn replay_state_equivalence() {
558        let store = InMemoryEventStore::new();
559        let run_id = "replay-equiv".to_string();
560        store
561            .append(
562                &run_id,
563                &[
564                    Event::StateUpdated {
565                        step_id: Some("a".into()),
566                        payload: serde_json::json!(10),
567                    },
568                    Event::StateUpdated {
569                        step_id: Some("b".into()),
570                        payload: serde_json::json!(20),
571                    },
572                    Event::Completed,
573                ],
574            )
575            .unwrap();
576        let k = Kernel::<TestState> {
577            events: Box::new(SharedEventStore(Arc::new(store))),
578            snaps: None,
579            reducer: Box::new(StateUpdatedOnlyReducer),
580            exec: Box::new(NoopActionExecutor),
581            step: Box::new(NoopStepFn),
582            policy: Box::new(AllowAllPolicy),
583            effect_sink: None,
584            mode: KernelMode::Normal,
585        };
586        let initial = TestState(0);
587        let s1 = k.replay(&run_id, initial.clone()).unwrap();
588        let s2 = k.replay(&run_id, initial).unwrap();
589        assert_eq!(s1, s2, "same log and initial state must yield equal state");
590        assert_eq!(s1.0, 20);
591    }
592
593    /// Executor that fails with Transient up to N times then succeeds (for retry integration test).
594    struct TransientThenSuccessExecutor {
595        fail_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
596        fail_up_to: usize,
597    }
598    impl TransientThenSuccessExecutor {
599        fn new(fail_up_to: usize) -> Self {
600            Self {
601                fail_count: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)),
602                fail_up_to,
603            }
604        }
605    }
606    impl ActionExecutor for TransientThenSuccessExecutor {
607        fn execute(&self, _run_id: &RunId, _action: &Action) -> Result<ActionResult, KernelError> {
608            let n = self
609                .fail_count
610                .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
611            if n < self.fail_up_to {
612                Err(KernelError::Executor(ActionError::transient("transient")))
613            } else {
614                Ok(ActionResult::Success(serde_json::json!("ok")))
615            }
616        }
617    }
618
619    #[test]
620    fn run_until_blocked_retry_then_success() {
621        use crate::kernel::policy::RetryWithBackoffPolicy;
622        let store = Arc::new(InMemoryEventStore::new());
623        let run_id = "run-retry-ok".to_string();
624        let k = Kernel::<TestState> {
625            events: Box::new(SharedEventStore(Arc::clone(&store))),
626            snaps: None,
627            reducer: Box::new(StateUpdatedOnlyReducer),
628            exec: Box::new(TransientThenSuccessExecutor::new(2)),
629            step: Box::new(DoOnceThenCompleteStep::new()),
630            policy: Box::new(RetryWithBackoffPolicy::new(AllowAllPolicy, 3, 0)),
631            effect_sink: None,
632            mode: KernelMode::Normal,
633        };
634        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
635        assert!(matches!(status, RunStatus::Completed));
636        let events = store.scan(&run_id, 1).unwrap();
637        let succeeded = events
638            .iter()
639            .filter(|e| matches!(e.event, Event::ActionSucceeded { .. }))
640            .count();
641        let failed = events
642            .iter()
643            .filter(|e| matches!(e.event, Event::ActionFailed { .. }))
644            .count();
645        assert_eq!(succeeded, 1, "exactly one ActionSucceeded after retries");
646        assert_eq!(failed, 0, "no ActionFailed when retries eventually succeed");
647    }
648
649    /// Failure path: executor returns Err → RunStatus::Failed and event log has ActionFailed for that action_id.
650    #[test]
651    fn run_until_blocked_failure_recovery() {
652        let store = Arc::new(InMemoryEventStore::new());
653        let run_id = "run-fail".to_string();
654        let k = Kernel::<TestState> {
655            events: Box::new(SharedEventStore(Arc::clone(&store))),
656            snaps: None,
657            reducer: Box::new(StateUpdatedOnlyReducer),
658            exec: Box::new(CountingActionExecutor::new(Arc::new(AtomicUsize::new(0)))),
659            step: Box::new(DoOnceThenCompleteStep::new()),
660            policy: Box::new(AllowAllPolicy),
661            effect_sink: None,
662            mode: KernelMode::Normal,
663        };
664        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
665        assert!(
666            matches!(status, RunStatus::Failed { recoverable } if !recoverable),
667            "default policy says Fail so recoverable should be false"
668        );
669        let events = store.scan(&run_id, 1).unwrap();
670        let has_requested = events
671            .iter()
672            .any(|e| matches!(e.event, Event::ActionRequested { .. }));
673        let has_failed = events
674            .iter()
675            .any(|e| matches!(e.event, Event::ActionFailed { .. }));
676        assert!(
677            has_requested && has_failed,
678            "event log must contain ActionRequested and ActionFailed for the same action"
679        );
680    }
681
682    /// Step that returns Next::Do once then Next::Complete so the driver executes one action then finishes.
683    struct DoOnceThenCompleteStep {
684        called: std::sync::Arc<std::sync::atomic::AtomicUsize>,
685    }
686    impl DoOnceThenCompleteStep {
687        fn new() -> Self {
688            Self {
689                called: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)),
690            }
691        }
692    }
693    impl StepFn<TestState> for DoOnceThenCompleteStep {
694        fn next(&self, _state: &TestState) -> Result<Next, KernelError> {
695            let n = self
696                .called
697                .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
698            if n == 0 {
699                Ok(Next::Do(Action::CallTool {
700                    tool: "dummy".into(),
701                    input: serde_json::json!(null),
702                }))
703            } else {
704                Ok(Next::Complete)
705            }
706        }
707    }
708
709    /// Step that does one action first, then completes.
710    struct DoThenCompleteStep {
711        called: AtomicUsize,
712    }
713    impl DoThenCompleteStep {
714        fn new() -> Self {
715            Self {
716                called: AtomicUsize::new(0),
717            }
718        }
719    }
720    impl StepFn<TestState> for DoThenCompleteStep {
721        fn next(&self, _state: &TestState) -> Result<Next, KernelError> {
722            let prev = self.called.fetch_add(1, Ordering::SeqCst);
723            if prev == 0 {
724                Ok(Next::Do(Action::CallTool {
725                    tool: "dummy".into(),
726                    input: serde_json::json!(null),
727                }))
728            } else {
729                Ok(Next::Complete)
730            }
731        }
732    }
733
734    /// Executor that returns pre-scripted results in order.
735    struct ScriptedActionExecutor {
736        responses: Mutex<Vec<Result<ActionResult, KernelError>>>,
737        calls: AtomicUsize,
738    }
739    impl ScriptedActionExecutor {
740        fn new(responses: Vec<Result<ActionResult, KernelError>>) -> Self {
741            Self {
742                responses: Mutex::new(responses),
743                calls: AtomicUsize::new(0),
744            }
745        }
746        fn calls(&self) -> usize {
747            self.calls.load(Ordering::SeqCst)
748        }
749    }
750    impl ActionExecutor for ScriptedActionExecutor {
751        fn execute(&self, _run_id: &RunId, _action: &Action) -> Result<ActionResult, KernelError> {
752            self.calls.fetch_add(1, Ordering::SeqCst);
753            let mut guard = self.responses.lock().unwrap();
754            if guard.is_empty() {
755                return Err(KernelError::Driver("missing scripted response".into()));
756            }
757            guard.remove(0)
758        }
759    }
760
761    /// Replay from snapshot applies only events after at_seq.
762    #[test]
763    fn replay_from_snapshot_applies_tail_only() {
764        use crate::kernel::InMemorySnapshotStore;
765        use crate::kernel::Snapshot;
766
767        let store = InMemoryEventStore::new();
768        let run_id = "replay-snap".to_string();
769        store
770            .append(
771                &run_id,
772                &[
773                    Event::StateUpdated {
774                        step_id: Some("a".into()),
775                        payload: serde_json::json!(10),
776                    },
777                    Event::StateUpdated {
778                        step_id: Some("b".into()),
779                        payload: serde_json::json!(20),
780                    },
781                    Event::StateUpdated {
782                        step_id: Some("c".into()),
783                        payload: serde_json::json!(30),
784                    },
785                    Event::Completed,
786                ],
787            )
788            .unwrap();
789        let snaps = InMemorySnapshotStore::new();
790        snaps
791            .save(&Snapshot {
792                run_id: run_id.clone(),
793                at_seq: 2,
794                state: TestState(20),
795            })
796            .unwrap();
797        let k = Kernel::<TestState> {
798            events: Box::new(store),
799            snaps: Some(Box::new(snaps)),
800            reducer: Box::new(StateUpdatedOnlyReducer),
801            exec: Box::new(NoopActionExecutor),
802            step: Box::new(NoopStepFn),
803            policy: Box::new(AllowAllPolicy),
804            effect_sink: None,
805            mode: KernelMode::Normal,
806        };
807        let state = k.replay_from_snapshot(&run_id, TestState(0)).unwrap();
808        assert_eq!(state.0, 30, "only events after at_seq=2 (seq 3) applied");
809    }
810
811    #[test]
812    fn action_result_failure_returns_failed_and_single_terminal_event() {
813        let store = Arc::new(InMemoryEventStore::new());
814        let run_id = "run-action-failure".to_string();
815        let k = Kernel::<TestState> {
816            events: Box::new(SharedEventStore(Arc::clone(&store))),
817            snaps: None,
818            reducer: Box::new(StateUpdatedOnlyReducer),
819            exec: Box::new(ScriptedActionExecutor::new(vec![Ok(
820                ActionResult::Failure("boom".into()),
821            )])),
822            step: Box::new(DoThenCompleteStep::new()),
823            policy: Box::new(AllowAllPolicy),
824            effect_sink: None,
825            mode: KernelMode::Normal,
826        };
827
828        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
829        assert!(
830            matches!(status, RunStatus::Failed { recoverable } if !recoverable),
831            "ActionResult::Failure must fail the run"
832        );
833
834        let events = store.scan(&run_id, 1).unwrap();
835        let mut requested_id: Option<String> = None;
836        let mut success_count = 0usize;
837        let mut failed_count = 0usize;
838        for e in &events {
839            match &e.event {
840                Event::ActionRequested { action_id, .. } => requested_id = Some(action_id.clone()),
841                Event::ActionSucceeded { .. } => success_count += 1,
842                Event::ActionFailed { action_id, .. } => {
843                    failed_count += 1;
844                    assert_eq!(
845                        requested_id.as_deref(),
846                        Some(action_id.as_str()),
847                        "failed event must match requested action_id"
848                    );
849                }
850                _ => {}
851            }
852        }
853        assert_eq!(success_count, 0);
854        assert_eq!(
855            failed_count, 1,
856            "only one terminal failure event is expected"
857        );
858    }
859
860    #[test]
861    fn retry_then_success_has_single_terminal_success_event() {
862        let store = Arc::new(InMemoryEventStore::new());
863        let run_id = "run-retry-success".to_string();
864        let exec = Arc::new(ScriptedActionExecutor::new(vec![
865            Err(KernelError::Executor(ActionError::transient("transient-1"))),
866            Err(KernelError::Executor(ActionError::transient("transient-2"))),
867            Ok(ActionResult::Success(serde_json::json!("ok"))),
868        ]));
869        let k = Kernel::<TestState> {
870            events: Box::new(SharedEventStore(Arc::clone(&store))),
871            snaps: None,
872            reducer: Box::new(StateUpdatedOnlyReducer),
873            exec: Box::new(ArcExecutor(Arc::clone(&exec))),
874            step: Box::new(DoThenCompleteStep::new()),
875            policy: Box::new(RetryWithBackoffPolicy::new(AllowAllPolicy, 3, 0)),
876            effect_sink: None,
877            mode: KernelMode::Normal,
878        };
879
880        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
881        assert!(matches!(status, RunStatus::Completed));
882        assert_eq!(exec.calls(), 3, "executor should be called for retries");
883
884        let events = store.scan(&run_id, 1).unwrap();
885        let requested_count = events
886            .iter()
887            .filter(|e| matches!(e.event, Event::ActionRequested { .. }))
888            .count();
889        let success_count = events
890            .iter()
891            .filter(|e| matches!(e.event, Event::ActionSucceeded { .. }))
892            .count();
893        let failed_count = events
894            .iter()
895            .filter(|e| matches!(e.event, Event::ActionFailed { .. }))
896            .count();
897        assert_eq!(
898            requested_count, 1,
899            "retry must not duplicate ActionRequested"
900        );
901        assert_eq!(
902            success_count, 1,
903            "exactly one terminal success event expected"
904        );
905        assert_eq!(failed_count, 0, "success path must not emit ActionFailed");
906    }
907
908    #[test]
909    fn retry_exhausted_has_single_terminal_failed_event() {
910        let store = Arc::new(InMemoryEventStore::new());
911        let run_id = "run-retry-fail".to_string();
912        let exec = Arc::new(ScriptedActionExecutor::new(vec![
913            Err(KernelError::Executor(ActionError::transient("transient-1"))),
914            Err(KernelError::Executor(ActionError::transient("transient-2"))),
915            Err(KernelError::Executor(ActionError::transient("transient-3"))),
916        ]));
917        let k = Kernel::<TestState> {
918            events: Box::new(SharedEventStore(Arc::clone(&store))),
919            snaps: None,
920            reducer: Box::new(StateUpdatedOnlyReducer),
921            exec: Box::new(ArcExecutor(Arc::clone(&exec))),
922            step: Box::new(DoThenCompleteStep::new()),
923            policy: Box::new(RetryWithBackoffPolicy::new(AllowAllPolicy, 1, 0)),
924            effect_sink: None,
925            mode: KernelMode::Normal,
926        };
927
928        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
929        assert!(
930            matches!(status, RunStatus::Failed { recoverable } if !recoverable),
931            "exhausted retries should fail run"
932        );
933        assert_eq!(exec.calls(), 2, "max_retries=1 should execute twice");
934
935        let events = store.scan(&run_id, 1).unwrap();
936        let requested_count = events
937            .iter()
938            .filter(|e| matches!(e.event, Event::ActionRequested { .. }))
939            .count();
940        let success_count = events
941            .iter()
942            .filter(|e| matches!(e.event, Event::ActionSucceeded { .. }))
943            .count();
944        let failed_count = events
945            .iter()
946            .filter(|e| matches!(e.event, Event::ActionFailed { .. }))
947            .count();
948        assert_eq!(
949            requested_count, 1,
950            "retry must not duplicate ActionRequested"
951        );
952        assert_eq!(success_count, 0);
953        assert_eq!(
954            failed_count, 1,
955            "exactly one terminal failed event expected"
956        );
957    }
958
959    struct ArcExecutor(Arc<ScriptedActionExecutor>);
960    impl ActionExecutor for ArcExecutor {
961        fn execute(&self, run_id: &RunId, action: &Action) -> Result<ActionResult, KernelError> {
962            self.0.execute(run_id, action)
963        }
964    }
965}