Skip to main content

oris_kernel/kernel/
driver.rs

1//! Kernel driver: run_until_blocked, resume, replay.
2
3use std::time::Duration;
4
5use crate::kernel::action::{Action, ActionError, ActionExecutor, ActionResult};
6use crate::kernel::determinism_guard::DeterminismGuard;
7use crate::kernel::event::{Event, EventStore, SequencedEvent};
8use crate::kernel::identity::{RunId, Seq};
9use crate::kernel::kernel_mode::KernelMode;
10use crate::kernel::policy::{Policy, PolicyCtx, RetryDecision};
11use crate::kernel::reducer::Reducer;
12use crate::kernel::runtime_effect::{EffectSink, RuntimeEffect};
13use crate::kernel::snapshot::{Snapshot, SnapshotStore};
14use crate::kernel::state::KernelState;
15use crate::kernel::step::{InterruptInfo, Next, StepFn};
16use crate::kernel::timeline;
17use crate::kernel::KernelError;
18
19/// Standardized status of a run after run_until_blocked or resume.
20#[derive(Clone, Debug)]
21pub enum RunStatus {
22    /// Run completed successfully.
23    Completed,
24    /// Run is blocked (e.g. on interrupt or WaitSignal); can be resumed.
25    Blocked(BlockedInfo),
26    /// Run is still advancing (optional; used when yielding before blocking).
27    Running,
28    /// Run failed; recoverable indicates whether resume/retry is possible.
29    Failed { recoverable: bool },
30}
31
32#[derive(Clone, Debug)]
33pub struct BlockedInfo {
34    pub interrupt: Option<InterruptInfo>,
35    pub wait_signal: Option<String>,
36}
37
38/// Signal to resume a blocked run (e.g. human approval, external event).
39#[derive(Clone, Debug)]
40pub enum Signal {
41    Resume(serde_json::Value),
42    Signal {
43        name: String,
44        value: serde_json::Value,
45    },
46}
47
48/// Kernel: event store, optional snapshot store, reducer, executor, step fn, policy, optional effect sink, execution mode.
49pub struct Kernel<S: KernelState> {
50    pub events: Box<dyn EventStore>,
51    pub snaps: Option<Box<dyn SnapshotStore<S>>>,
52    pub reducer: Box<dyn Reducer<S>>,
53    pub exec: Box<dyn ActionExecutor>,
54    pub step: Box<dyn StepFn<S>>,
55    pub policy: Box<dyn Policy>,
56    /// When set, every runtime effect (LLM/tool call, state write, interrupt) is recorded here.
57    pub effect_sink: Option<Box<dyn EffectSink>>,
58    /// Execution mode: Normal, Record, Replay, or Verify. Replay/Verify trap clock, randomness, spawn.
59    pub mode: KernelMode,
60}
61
62impl<S: KernelState> Kernel<S> {
63    /// Returns a determinism guard for the current mode. Use before clock, randomness, or spawn in Replay/Verify.
64    pub fn determinism_guard(&self) -> DeterminismGuard {
65        DeterminismGuard::new(self.mode)
66    }
67
68    /// Runs until the step returns Complete or Interrupt/WaitSignal. Returns run status.
69    /// State is obtained by replaying the event log (or initial_state if the run has no events yet).
70    pub fn run_until_blocked(
71        &self,
72        run_id: &RunId,
73        initial_state: S,
74    ) -> Result<RunStatus, KernelError> {
75        self.run_loop(run_id, initial_state)
76    }
77
78    /// Resumes a blocked run with a signal (e.g. resume value or external signal).
79    /// Appends Resumed (or Signal) event, then runs until blocked or complete.
80    pub fn resume(
81        &self,
82        run_id: &RunId,
83        initial_state: S,
84        signal: Signal,
85    ) -> Result<RunStatus, KernelError> {
86        let value = match &signal {
87            Signal::Resume(v) => v.clone(),
88            Signal::Signal { value, .. } => value.clone(),
89        };
90        self.events.append(run_id, &[Event::Resumed { value }])?;
91        self.run_loop(run_id, initial_state)
92    }
93
94    /// Inner loop: replay to get state, then step until Complete or Blocked.
95    fn run_loop(&self, run_id: &RunId, initial_state: S) -> Result<RunStatus, KernelError> {
96        let mut state = self.restore_state(run_id, initial_state)?;
97
98        loop {
99            let next = self.step.next(&state)?;
100            match next {
101                Next::Emit(evs) => {
102                    if let Some(sink) = &self.effect_sink {
103                        for ev in &evs {
104                            if let Event::StateUpdated { step_id, payload } = ev {
105                                sink.record(
106                                    run_id,
107                                    &RuntimeEffect::StateWrite {
108                                        step_id: step_id.clone(),
109                                        payload: payload.clone(),
110                                    },
111                                );
112                            }
113                        }
114                    }
115                    if !evs.is_empty() {
116                        self.append_and_apply(run_id, &mut state, &evs)?;
117                    }
118                }
119                Next::Do(action) => {
120                    if let Some(sink) = &self.effect_sink {
121                        match &action {
122                            Action::CallLLM { provider, input } => {
123                                sink.record(
124                                    run_id,
125                                    &RuntimeEffect::LLMCall {
126                                        provider: provider.clone(),
127                                        input: input.clone(),
128                                    },
129                                );
130                            }
131                            Action::CallTool { tool, input } => {
132                                sink.record(
133                                    run_id,
134                                    &RuntimeEffect::ToolCall {
135                                        tool: tool.clone(),
136                                        input: input.clone(),
137                                    },
138                                );
139                            }
140                            _ => {}
141                        }
142                    }
143                    self.policy
144                        .authorize(run_id, &action, &PolicyCtx::default())?;
145                    let before = self.events.head(run_id)?;
146                    let action_id = format!("{}-{}", run_id, before + 1);
147                    let payload = serde_json::to_value(&action)
148                        .map_err(|e| KernelError::Driver(e.to_string()))?;
149                    self.append_and_apply(
150                        run_id,
151                        &mut state,
152                        &[Event::ActionRequested {
153                            action_id: action_id.clone(),
154                            payload,
155                        }],
156                    )?;
157                    let result = self.exec.execute(run_id, &action);
158                    match result {
159                        Ok(ActionResult::Success(output)) => {
160                            self.append_and_apply(
161                                run_id,
162                                &mut state,
163                                &[Event::ActionSucceeded {
164                                    action_id: action_id.clone(),
165                                    output,
166                                }],
167                            )?;
168                        }
169                        Ok(ActionResult::Failure(error)) => {
170                            self.append_and_apply(
171                                run_id,
172                                &mut state,
173                                &[Event::ActionFailed { action_id, error }],
174                            )?;
175                            return Ok(RunStatus::Failed { recoverable: false });
176                        }
177                        Err(mut e) => {
178                            let mut attempt = 0u32;
179                            loop {
180                                let action_err = ActionError::from_kernel_error(&e);
181                                let decision = self.policy.retry_strategy_attempt(
182                                    &action_err,
183                                    &action,
184                                    attempt,
185                                );
186                                match decision {
187                                    RetryDecision::Fail => {
188                                        self.append_and_apply(
189                                            run_id,
190                                            &mut state,
191                                            &[Event::ActionFailed {
192                                                action_id: action_id.clone(),
193                                                error: e.to_string(),
194                                            }],
195                                        )?;
196                                        return Ok(RunStatus::Failed { recoverable: false });
197                                    }
198                                    RetryDecision::Retry | RetryDecision::RetryAfterMs(0) => {}
199                                    RetryDecision::RetryAfterMs(ms) => {
200                                        std::thread::sleep(Duration::from_millis(ms));
201                                    }
202                                }
203                                attempt += 1;
204                                match self.exec.execute(run_id, &action) {
205                                    Ok(ActionResult::Success(output)) => {
206                                        self.append_and_apply(
207                                            run_id,
208                                            &mut state,
209                                            &[Event::ActionSucceeded {
210                                                action_id: action_id.clone(),
211                                                output,
212                                            }],
213                                        )?;
214                                        break;
215                                    }
216                                    Ok(ActionResult::Failure(error)) => {
217                                        self.append_and_apply(
218                                            run_id,
219                                            &mut state,
220                                            &[Event::ActionFailed {
221                                                action_id: action_id.clone(),
222                                                error,
223                                            }],
224                                        )?;
225                                        return Ok(RunStatus::Failed { recoverable: false });
226                                    }
227                                    Err(e2) => e = e2,
228                                }
229                            }
230                        }
231                    }
232                }
233                Next::Interrupt(info) => {
234                    if let Some(sink) = &self.effect_sink {
235                        sink.record(
236                            run_id,
237                            &RuntimeEffect::InterruptRaise {
238                                value: info.value.clone(),
239                            },
240                        );
241                    }
242                    self.append_and_apply(
243                        run_id,
244                        &mut state,
245                        &[Event::Interrupted {
246                            value: info.value.clone(),
247                        }],
248                    )?;
249                    return Ok(RunStatus::Blocked(BlockedInfo {
250                        interrupt: Some(info),
251                        wait_signal: None,
252                    }));
253                }
254                Next::Complete => {
255                    self.append_and_apply(run_id, &mut state, &[Event::Completed])?;
256                    return Ok(RunStatus::Completed);
257                }
258            }
259        }
260    }
261
262    fn restore_state(&self, run_id: &RunId, initial_state: S) -> Result<S, KernelError> {
263        const FROM_SEQ: Seq = 1;
264        let latest_snapshot = self.load_latest_snapshot(run_id)?;
265        let (mut state, from_seq) = match latest_snapshot {
266            Some(snapshot) => (snapshot.state, snapshot.at_seq + 1),
267            None => (initial_state, FROM_SEQ),
268        };
269        let sequenced = self.events.scan(run_id, from_seq)?;
270        self.apply_events(run_id, &mut state, sequenced)?;
271        Ok(state)
272    }
273
274    fn load_latest_snapshot(&self, run_id: &RunId) -> Result<Option<Snapshot<S>>, KernelError> {
275        match &self.snaps {
276            Some(store) => store.load_latest(run_id),
277            None => Ok(None),
278        }
279    }
280
281    fn append_and_apply(
282        &self,
283        run_id: &RunId,
284        state: &mut S,
285        events: &[Event],
286    ) -> Result<(), KernelError> {
287        if events.is_empty() {
288            return Ok(());
289        }
290        let before = self.events.head(run_id)?;
291        self.events.append(run_id, events)?;
292        let sequenced = self.events.scan(run_id, before + 1)?;
293        self.apply_events(run_id, state, sequenced)
294    }
295
296    fn apply_events(
297        &self,
298        run_id: &RunId,
299        state: &mut S,
300        sequenced: Vec<SequencedEvent>,
301    ) -> Result<(), KernelError> {
302        for se in sequenced {
303            self.reducer.apply(state, &se)?;
304            self.save_snapshot(run_id, se.seq, state)?;
305        }
306        Ok(())
307    }
308
309    fn save_snapshot(&self, run_id: &RunId, at_seq: Seq, state: &S) -> Result<(), KernelError> {
310        if let Some(store) = &self.snaps {
311            store.save(&Snapshot {
312                run_id: run_id.clone(),
313                at_seq,
314                state: state.clone(),
315            })?;
316        }
317        Ok(())
318    }
319
320    /// Replays the run from the event log without executing external actions; returns final state.
321    ///
322    /// Scans all events for the run (from seq 1), applies each with the Reducer in order.
323    /// Does not call ActionExecutor; any ActionRequested is satisfied by the following
324    /// ActionSucceeded/ActionFailed already stored in the log (reducer applies them).
325    pub fn replay(&self, run_id: &RunId, initial_state: S) -> Result<S, KernelError> {
326        self.replay_from(run_id, initial_state, None)
327    }
328
329    /// Replays from a snapshot if available, otherwise from initial state.
330    ///
331    /// If `snaps` is set and `load_latest(run_id)` returns a snapshot, state starts at
332    /// `snap.state` and only events with seq > snap.at_seq are applied; otherwise
333    /// starts at `initial_state` and replays from seq 1.
334    pub fn replay_from_snapshot(&self, run_id: &RunId, initial_state: S) -> Result<S, KernelError> {
335        let from_snap = self.load_latest_snapshot(run_id)?;
336        self.replay_from(run_id, initial_state, from_snap.as_ref())
337    }
338
339    /// Replay-only: no executor or step is called; recorded outputs are applied from the event log.
340    fn replay_from(
341        &self,
342        run_id: &RunId,
343        initial_state: S,
344        snapshot: Option<&Snapshot<S>>,
345    ) -> Result<S, KernelError> {
346        const FROM_SEQ: Seq = 1;
347        let (mut state, from_seq) = match snapshot {
348            Some(snap) => (snap.state.clone(), snap.at_seq + 1),
349            None => (initial_state, FROM_SEQ),
350        };
351        let sequenced = self.events.scan(run_id, from_seq)?;
352        for se in sequenced {
353            self.reducer.apply(&mut state, &se)?;
354        }
355        Ok(state)
356    }
357
358    /// Builds a run timeline (event list + final status) for audit/debugging. Serialize to JSON for UI/CLI.
359    pub fn run_timeline(&self, run_id: &RunId) -> Result<timeline::RunTimeline, KernelError> {
360        timeline::run_timeline(self.events.as_ref(), run_id)
361    }
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367    use crate::kernel::action::{Action, ActionError, ActionExecutor, ActionResult};
368    use crate::kernel::event::Event;
369    use crate::kernel::event_store::{InMemoryEventStore, SharedEventStore};
370    use crate::kernel::policy::RetryWithBackoffPolicy;
371    use crate::kernel::runtime_effect::RuntimeEffect;
372    use crate::kernel::snapshot::{InMemorySnapshotStore, SnapshotStore};
373    use crate::kernel::stubs::{AllowAllPolicy, NoopActionExecutor, NoopStepFn};
374    use crate::kernel::StateUpdatedOnlyReducer;
375    use serde::{Deserialize, Serialize};
376    use std::sync::atomic::{AtomicUsize, Ordering};
377    use std::sync::{Arc, Mutex};
378
379    #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
380    struct TestState(u32);
381    impl KernelState for TestState {
382        fn version(&self) -> u32 {
383            1
384        }
385    }
386
387    /// Mock executor that counts execute() calls; used to assert replay does not call executor.
388    struct CountingActionExecutor(Arc<AtomicUsize>);
389    impl CountingActionExecutor {
390        fn new(counter: Arc<AtomicUsize>) -> Self {
391            Self(counter)
392        }
393        fn count(&self) -> usize {
394            self.0.load(Ordering::SeqCst)
395        }
396    }
397    impl ActionExecutor for CountingActionExecutor {
398        fn execute(&self, _run_id: &RunId, _action: &Action) -> Result<ActionResult, KernelError> {
399            self.0.fetch_add(1, Ordering::SeqCst);
400            Err(KernelError::Driver("mock".into()))
401        }
402    }
403
404    struct SharedSnapshotStoreHandle<S>(Arc<InMemorySnapshotStore<S>>);
405    impl<S: Clone + Send + Sync> SnapshotStore<S> for SharedSnapshotStoreHandle<S> {
406        fn load_latest(&self, run_id: &RunId) -> Result<Option<Snapshot<S>>, KernelError> {
407            self.0.load_latest(run_id)
408        }
409
410        fn save(&self, snapshot: &Snapshot<S>) -> Result<(), KernelError> {
411            self.0.save(snapshot)
412        }
413    }
414
415    struct CountingStateReducer(Arc<AtomicUsize>);
416    impl Reducer<TestState> for CountingStateReducer {
417        fn apply(&self, state: &mut TestState, event: &SequencedEvent) -> Result<(), KernelError> {
418            self.0.fetch_add(1, Ordering::SeqCst);
419            if let Event::StateUpdated { payload, .. } = &event.event {
420                *state = serde_json::from_value(payload.clone())
421                    .map_err(|e| KernelError::Reducer(e.to_string()))?;
422            }
423            Ok(())
424        }
425    }
426
427    /// Sink that collects effects into a shared Vec for tests.
428    struct CollectingEffectSink(Arc<Mutex<Vec<(String, RuntimeEffect)>>>);
429    impl crate::kernel::runtime_effect::EffectSink for CollectingEffectSink {
430        fn record(&self, run_id: &RunId, effect: &RuntimeEffect) {
431            self.0
432                .lock()
433                .unwrap()
434                .push((run_id.clone(), effect.clone()));
435        }
436    }
437
438    /// Step that emits one StateUpdated then completes (for effect-capture test).
439    struct EmitOnceThenCompleteStep(AtomicUsize);
440    impl StepFn<TestState> for EmitOnceThenCompleteStep {
441        fn next(&self, _state: &TestState) -> Result<Next, KernelError> {
442            if self.0.fetch_add(1, Ordering::SeqCst) == 0 {
443                Ok(Next::Emit(vec![Event::StateUpdated {
444                    step_id: Some("node1".into()),
445                    payload: serde_json::to_value(&TestState(1)).unwrap(),
446                }]))
447            } else {
448                Ok(Next::Complete)
449            }
450        }
451    }
452
453    #[test]
454    fn run_until_blocked_complete() {
455        let k = Kernel::<TestState> {
456            events: Box::new(InMemoryEventStore::new()),
457            snaps: None,
458            reducer: Box::new(StateUpdatedOnlyReducer),
459            exec: Box::new(NoopActionExecutor),
460            step: Box::new(NoopStepFn),
461            policy: Box::new(AllowAllPolicy),
462            effect_sink: None,
463            mode: KernelMode::Normal,
464        };
465        let run_id = "run-complete".to_string();
466        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
467        assert!(matches!(status, RunStatus::Completed));
468    }
469
470    #[test]
471    fn run_until_blocked_persists_latest_snapshot_on_completion() {
472        let snapshots = Arc::new(InMemorySnapshotStore::new());
473        let k = Kernel::<TestState> {
474            events: Box::new(InMemoryEventStore::new()),
475            snaps: Some(Box::new(SharedSnapshotStoreHandle(Arc::clone(&snapshots)))),
476            reducer: Box::new(StateUpdatedOnlyReducer),
477            exec: Box::new(NoopActionExecutor),
478            step: Box::new(EmitOnceThenCompleteStep(AtomicUsize::new(0))),
479            policy: Box::new(AllowAllPolicy),
480            effect_sink: None,
481            mode: KernelMode::Normal,
482        };
483        let run_id = "run-snapshot-complete".to_string();
484        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
485        assert!(matches!(status, RunStatus::Completed));
486
487        let snapshot = snapshots.load_latest(&run_id).unwrap().unwrap();
488        assert_eq!(
489            snapshot.at_seq, 2,
490            "StateUpdated + Completed should both checkpoint"
491        );
492        assert_eq!(snapshot.state, TestState(1));
493    }
494
495    #[test]
496    fn kernel_replay_mode_determinism_guard_traps_clock() {
497        let k = Kernel::<TestState> {
498            events: Box::new(InMemoryEventStore::new()),
499            snaps: None,
500            reducer: Box::new(StateUpdatedOnlyReducer),
501            exec: Box::new(NoopActionExecutor),
502            step: Box::new(NoopStepFn),
503            policy: Box::new(AllowAllPolicy),
504            effect_sink: None,
505            mode: KernelMode::Replay,
506        };
507        let guard = k.determinism_guard();
508        let err = guard.check_clock_access().unwrap_err();
509        assert!(err.to_string().contains("Replay"));
510    }
511
512    #[test]
513    fn effect_sink_captures_state_write_and_complete() {
514        let effects: Arc<Mutex<Vec<(String, RuntimeEffect)>>> = Arc::new(Mutex::new(Vec::new()));
515        let sink = CollectingEffectSink(Arc::clone(&effects));
516        let k = Kernel::<TestState> {
517            events: Box::new(InMemoryEventStore::new()),
518            snaps: None,
519            reducer: Box::new(StateUpdatedOnlyReducer),
520            exec: Box::new(NoopActionExecutor),
521            step: Box::new(EmitOnceThenCompleteStep(AtomicUsize::new(0))),
522            policy: Box::new(AllowAllPolicy),
523            effect_sink: Some(Box::new(sink)),
524            mode: KernelMode::Normal,
525        };
526        let run_id = "run-effect-capture".to_string();
527        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
528        assert!(matches!(status, RunStatus::Completed));
529        let recorded = effects.lock().unwrap();
530        assert_eq!(
531            recorded.len(),
532            1,
533            "one effect (StateWrite) should be captured"
534        );
535        match &recorded[0].1 {
536            RuntimeEffect::StateWrite { step_id, payload } => {
537                assert_eq!(step_id.as_deref(), Some("node1"));
538                let s: TestState = serde_json::from_value(payload.clone()).unwrap();
539                assert_eq!(s.0, 1);
540            }
541            _ => panic!("expected StateWrite"),
542        }
543    }
544
545    #[test]
546    fn run_timeline_after_complete_has_events_and_json() {
547        let store = InMemoryEventStore::new();
548        let k = Kernel::<TestState> {
549            events: Box::new(store),
550            snaps: None,
551            reducer: Box::new(StateUpdatedOnlyReducer),
552            exec: Box::new(NoopActionExecutor),
553            step: Box::new(NoopStepFn),
554            policy: Box::new(AllowAllPolicy),
555            effect_sink: None,
556            mode: KernelMode::Normal,
557        };
558        let run_id = "timeline-run".to_string();
559        let _ = k.run_until_blocked(&run_id, TestState(0)).unwrap();
560        let tl = k.run_timeline(&run_id).unwrap();
561        assert_eq!(tl.run_id, run_id);
562        let kinds: Vec<&str> = tl.events.iter().map(|e| e.kind.as_str()).collect();
563        assert!(
564            kinds.contains(&"Completed"),
565            "timeline should contain Completed"
566        );
567        let json = serde_json::to_string(&tl).unwrap();
568        assert!(json.contains("Completed"));
569    }
570
571    struct InterruptOnceStep(bool);
572    impl StepFn<TestState> for InterruptOnceStep {
573        fn next(&self, _state: &TestState) -> Result<Next, KernelError> {
574            if !self.0 {
575                Ok(Next::Interrupt(InterruptInfo {
576                    value: serde_json::json!("pause"),
577                }))
578            } else {
579                Ok(Next::Complete)
580            }
581        }
582    }
583
584    #[test]
585    fn run_until_blocked_then_resume() {
586        let inner = Arc::new(InMemoryEventStore::new());
587        let events = Box::new(SharedEventStore(inner.clone()));
588        let run_id = "run-interrupt-resume".to_string();
589        let k = Kernel::<TestState> {
590            events,
591            snaps: None,
592            reducer: Box::new(StateUpdatedOnlyReducer),
593            exec: Box::new(NoopActionExecutor),
594            step: Box::new(InterruptOnceStep(false)),
595            policy: Box::new(AllowAllPolicy),
596            effect_sink: None,
597            mode: KernelMode::Normal,
598        };
599        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
600        assert!(matches!(status, RunStatus::Blocked(_)));
601
602        let k2 = Kernel::<TestState> {
603            events: Box::new(SharedEventStore(inner)),
604            snaps: None,
605            reducer: Box::new(StateUpdatedOnlyReducer),
606            exec: Box::new(NoopActionExecutor),
607            step: Box::new(InterruptOnceStep(true)),
608            policy: Box::new(AllowAllPolicy),
609            effect_sink: None,
610            mode: KernelMode::Normal,
611        };
612        let status2 = k2
613            .resume(&run_id, TestState(0), Signal::Resume(serde_json::json!(1)))
614            .unwrap();
615        assert!(matches!(status2, RunStatus::Completed));
616    }
617
618    #[test]
619    fn interrupt_saves_snapshot_before_returning_blocked() {
620        let snapshots = Arc::new(InMemorySnapshotStore::new());
621        let k = Kernel::<TestState> {
622            events: Box::new(InMemoryEventStore::new()),
623            snaps: Some(Box::new(SharedSnapshotStoreHandle(Arc::clone(&snapshots)))),
624            reducer: Box::new(StateUpdatedOnlyReducer),
625            exec: Box::new(NoopActionExecutor),
626            step: Box::new(InterruptOnceStep(false)),
627            policy: Box::new(AllowAllPolicy),
628            effect_sink: None,
629            mode: KernelMode::Normal,
630        };
631        let run_id = "run-interrupt-checkpoint".to_string();
632
633        let status = k.run_until_blocked(&run_id, TestState(7)).unwrap();
634        assert!(matches!(status, RunStatus::Blocked(_)));
635
636        let snapshot = snapshots.load_latest(&run_id).unwrap().unwrap();
637        assert_eq!(snapshot.at_seq, 1);
638        assert_eq!(snapshot.state, TestState(7));
639    }
640
641    /// Replay must not call ActionExecutor (0 side effects).
642    #[test]
643    fn replay_no_side_effects() {
644        let exec_count = Arc::new(AtomicUsize::new(0));
645        let store = InMemoryEventStore::new();
646        let run_id = "replay-no-effects".to_string();
647        store
648            .append(
649                &run_id,
650                &[
651                    Event::StateUpdated {
652                        step_id: Some("n1".into()),
653                        payload: serde_json::json!(42),
654                    },
655                    Event::Completed,
656                ],
657            )
658            .unwrap();
659        let k = Kernel::<TestState> {
660            events: Box::new(store),
661            snaps: None,
662            reducer: Box::new(StateUpdatedOnlyReducer),
663            exec: Box::new(CountingActionExecutor::new(Arc::clone(&exec_count))),
664            step: Box::new(NoopStepFn),
665            policy: Box::new(AllowAllPolicy),
666            effect_sink: None,
667            mode: KernelMode::Normal,
668        };
669        let _ = k.replay(&run_id, TestState(0)).unwrap();
670        assert_eq!(
671            exec_count.load(Ordering::SeqCst),
672            0,
673            "replay must not call executor"
674        );
675    }
676
677    /// Same event log and initial state must yield identical state on multiple replays.
678    #[test]
679    fn replay_state_equivalence() {
680        let store = InMemoryEventStore::new();
681        let run_id = "replay-equiv".to_string();
682        store
683            .append(
684                &run_id,
685                &[
686                    Event::StateUpdated {
687                        step_id: Some("a".into()),
688                        payload: serde_json::json!(10),
689                    },
690                    Event::StateUpdated {
691                        step_id: Some("b".into()),
692                        payload: serde_json::json!(20),
693                    },
694                    Event::Completed,
695                ],
696            )
697            .unwrap();
698        let k = Kernel::<TestState> {
699            events: Box::new(SharedEventStore(Arc::new(store))),
700            snaps: None,
701            reducer: Box::new(StateUpdatedOnlyReducer),
702            exec: Box::new(NoopActionExecutor),
703            step: Box::new(NoopStepFn),
704            policy: Box::new(AllowAllPolicy),
705            effect_sink: None,
706            mode: KernelMode::Normal,
707        };
708        let initial = TestState(0);
709        let s1 = k.replay(&run_id, initial.clone()).unwrap();
710        let s2 = k.replay(&run_id, initial).unwrap();
711        assert_eq!(s1, s2, "same log and initial state must yield equal state");
712        assert_eq!(s1.0, 20);
713    }
714
715    /// Executor that fails with Transient up to N times then succeeds (for retry integration test).
716    struct TransientThenSuccessExecutor {
717        fail_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
718        fail_up_to: usize,
719    }
720    impl TransientThenSuccessExecutor {
721        fn new(fail_up_to: usize) -> Self {
722            Self {
723                fail_count: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)),
724                fail_up_to,
725            }
726        }
727    }
728    impl ActionExecutor for TransientThenSuccessExecutor {
729        fn execute(&self, _run_id: &RunId, _action: &Action) -> Result<ActionResult, KernelError> {
730            let n = self
731                .fail_count
732                .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
733            if n < self.fail_up_to {
734                Err(KernelError::Executor(ActionError::transient("transient")))
735            } else {
736                Ok(ActionResult::Success(serde_json::json!("ok")))
737            }
738        }
739    }
740
741    #[test]
742    fn run_until_blocked_retry_then_success() {
743        use crate::kernel::policy::RetryWithBackoffPolicy;
744        let store = Arc::new(InMemoryEventStore::new());
745        let run_id = "run-retry-ok".to_string();
746        let k = Kernel::<TestState> {
747            events: Box::new(SharedEventStore(Arc::clone(&store))),
748            snaps: None,
749            reducer: Box::new(StateUpdatedOnlyReducer),
750            exec: Box::new(TransientThenSuccessExecutor::new(2)),
751            step: Box::new(DoOnceThenCompleteStep::new()),
752            policy: Box::new(RetryWithBackoffPolicy::new(AllowAllPolicy, 3, 0)),
753            effect_sink: None,
754            mode: KernelMode::Normal,
755        };
756        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
757        assert!(matches!(status, RunStatus::Completed));
758        let events = store.scan(&run_id, 1).unwrap();
759        let succeeded = events
760            .iter()
761            .filter(|e| matches!(e.event, Event::ActionSucceeded { .. }))
762            .count();
763        let failed = events
764            .iter()
765            .filter(|e| matches!(e.event, Event::ActionFailed { .. }))
766            .count();
767        assert_eq!(succeeded, 1, "exactly one ActionSucceeded after retries");
768        assert_eq!(failed, 0, "no ActionFailed when retries eventually succeed");
769    }
770
771    /// Failure path: executor returns Err → RunStatus::Failed and event log has ActionFailed for that action_id.
772    #[test]
773    fn run_until_blocked_failure_recovery() {
774        let store = Arc::new(InMemoryEventStore::new());
775        let run_id = "run-fail".to_string();
776        let k = Kernel::<TestState> {
777            events: Box::new(SharedEventStore(Arc::clone(&store))),
778            snaps: None,
779            reducer: Box::new(StateUpdatedOnlyReducer),
780            exec: Box::new(CountingActionExecutor::new(Arc::new(AtomicUsize::new(0)))),
781            step: Box::new(DoOnceThenCompleteStep::new()),
782            policy: Box::new(AllowAllPolicy),
783            effect_sink: None,
784            mode: KernelMode::Normal,
785        };
786        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
787        assert!(
788            matches!(status, RunStatus::Failed { recoverable } if !recoverable),
789            "default policy says Fail so recoverable should be false"
790        );
791        let events = store.scan(&run_id, 1).unwrap();
792        let has_requested = events
793            .iter()
794            .any(|e| matches!(e.event, Event::ActionRequested { .. }));
795        let has_failed = events
796            .iter()
797            .any(|e| matches!(e.event, Event::ActionFailed { .. }));
798        assert!(
799            has_requested && has_failed,
800            "event log must contain ActionRequested and ActionFailed for the same action"
801        );
802    }
803
804    /// Step that returns Next::Do once then Next::Complete so the driver executes one action then finishes.
805    struct DoOnceThenCompleteStep {
806        called: std::sync::Arc<std::sync::atomic::AtomicUsize>,
807    }
808    impl DoOnceThenCompleteStep {
809        fn new() -> Self {
810            Self {
811                called: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)),
812            }
813        }
814    }
815    impl StepFn<TestState> for DoOnceThenCompleteStep {
816        fn next(&self, _state: &TestState) -> Result<Next, KernelError> {
817            let n = self
818                .called
819                .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
820            if n == 0 {
821                Ok(Next::Do(Action::CallTool {
822                    tool: "dummy".into(),
823                    input: serde_json::json!(null),
824                }))
825            } else {
826                Ok(Next::Complete)
827            }
828        }
829    }
830
831    /// Step that does one action first, then completes.
832    struct DoThenCompleteStep {
833        called: AtomicUsize,
834    }
835    impl DoThenCompleteStep {
836        fn new() -> Self {
837            Self {
838                called: AtomicUsize::new(0),
839            }
840        }
841    }
842    impl StepFn<TestState> for DoThenCompleteStep {
843        fn next(&self, _state: &TestState) -> Result<Next, KernelError> {
844            let prev = self.called.fetch_add(1, Ordering::SeqCst);
845            if prev == 0 {
846                Ok(Next::Do(Action::CallTool {
847                    tool: "dummy".into(),
848                    input: serde_json::json!(null),
849                }))
850            } else {
851                Ok(Next::Complete)
852            }
853        }
854    }
855
856    /// Executor that returns pre-scripted results in order.
857    struct ScriptedActionExecutor {
858        responses: Mutex<Vec<Result<ActionResult, KernelError>>>,
859        calls: AtomicUsize,
860    }
861    impl ScriptedActionExecutor {
862        fn new(responses: Vec<Result<ActionResult, KernelError>>) -> Self {
863            Self {
864                responses: Mutex::new(responses),
865                calls: AtomicUsize::new(0),
866            }
867        }
868        fn calls(&self) -> usize {
869            self.calls.load(Ordering::SeqCst)
870        }
871    }
872    impl ActionExecutor for ScriptedActionExecutor {
873        fn execute(&self, _run_id: &RunId, _action: &Action) -> Result<ActionResult, KernelError> {
874            self.calls.fetch_add(1, Ordering::SeqCst);
875            let mut guard = self.responses.lock().unwrap();
876            if guard.is_empty() {
877                return Err(KernelError::Driver("missing scripted response".into()));
878            }
879            guard.remove(0)
880        }
881    }
882
883    /// Replay from snapshot applies only events after at_seq.
884    #[test]
885    fn replay_from_snapshot_applies_tail_only() {
886        use crate::kernel::Snapshot;
887
888        let store = InMemoryEventStore::new();
889        let run_id = "replay-snap".to_string();
890        store
891            .append(
892                &run_id,
893                &[
894                    Event::StateUpdated {
895                        step_id: Some("a".into()),
896                        payload: serde_json::json!(10),
897                    },
898                    Event::StateUpdated {
899                        step_id: Some("b".into()),
900                        payload: serde_json::json!(20),
901                    },
902                    Event::StateUpdated {
903                        step_id: Some("c".into()),
904                        payload: serde_json::json!(30),
905                    },
906                    Event::Completed,
907                ],
908            )
909            .unwrap();
910        let snaps = InMemorySnapshotStore::new();
911        snaps
912            .save(&Snapshot {
913                run_id: run_id.clone(),
914                at_seq: 2,
915                state: TestState(20),
916            })
917            .unwrap();
918        let k = Kernel::<TestState> {
919            events: Box::new(store),
920            snaps: Some(Box::new(snaps)),
921            reducer: Box::new(StateUpdatedOnlyReducer),
922            exec: Box::new(NoopActionExecutor),
923            step: Box::new(NoopStepFn),
924            policy: Box::new(AllowAllPolicy),
925            effect_sink: None,
926            mode: KernelMode::Normal,
927        };
928        let state = k.replay_from_snapshot(&run_id, TestState(0)).unwrap();
929        assert_eq!(state.0, 30, "only events after at_seq=2 (seq 3) applied");
930    }
931
932    #[test]
933    fn run_loop_replays_only_tail_after_loading_latest_snapshot() {
934        let store = InMemoryEventStore::new();
935        let run_id = "run-tail-only".to_string();
936        store
937            .append(
938                &run_id,
939                &[
940                    Event::StateUpdated {
941                        step_id: Some("a".into()),
942                        payload: serde_json::json!(1),
943                    },
944                    Event::StateUpdated {
945                        step_id: Some("b".into()),
946                        payload: serde_json::json!(2),
947                    },
948                    Event::StateUpdated {
949                        step_id: Some("c".into()),
950                        payload: serde_json::json!(3),
951                    },
952                ],
953            )
954            .unwrap();
955
956        let snapshots = Arc::new(InMemorySnapshotStore::new());
957        snapshots
958            .save(&Snapshot {
959                run_id: run_id.clone(),
960                at_seq: 2,
961                state: TestState(2),
962            })
963            .unwrap();
964        let apply_count = Arc::new(AtomicUsize::new(0));
965        let k = Kernel::<TestState> {
966            events: Box::new(store),
967            snaps: Some(Box::new(SharedSnapshotStoreHandle(Arc::clone(&snapshots)))),
968            reducer: Box::new(CountingStateReducer(Arc::clone(&apply_count))),
969            exec: Box::new(NoopActionExecutor),
970            step: Box::new(NoopStepFn),
971            policy: Box::new(AllowAllPolicy),
972            effect_sink: None,
973            mode: KernelMode::Normal,
974        };
975
976        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
977        assert!(matches!(status, RunStatus::Completed));
978        assert_eq!(
979            apply_count.load(Ordering::SeqCst),
980            2,
981            "only the tail event plus the new Completed event should be reduced"
982        );
983
984        let snapshot = snapshots.load_latest(&run_id).unwrap().unwrap();
985        assert_eq!(snapshot.at_seq, 4);
986        assert_eq!(snapshot.state, TestState(3));
987    }
988
989    #[test]
990    fn action_result_failure_returns_failed_and_single_terminal_event() {
991        let store = Arc::new(InMemoryEventStore::new());
992        let run_id = "run-action-failure".to_string();
993        let k = Kernel::<TestState> {
994            events: Box::new(SharedEventStore(Arc::clone(&store))),
995            snaps: None,
996            reducer: Box::new(StateUpdatedOnlyReducer),
997            exec: Box::new(ScriptedActionExecutor::new(vec![Ok(
998                ActionResult::Failure("boom".into()),
999            )])),
1000            step: Box::new(DoThenCompleteStep::new()),
1001            policy: Box::new(AllowAllPolicy),
1002            effect_sink: None,
1003            mode: KernelMode::Normal,
1004        };
1005
1006        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
1007        assert!(
1008            matches!(status, RunStatus::Failed { recoverable } if !recoverable),
1009            "ActionResult::Failure must fail the run"
1010        );
1011
1012        let events = store.scan(&run_id, 1).unwrap();
1013        let mut requested_id: Option<String> = None;
1014        let mut success_count = 0usize;
1015        let mut failed_count = 0usize;
1016        for e in &events {
1017            match &e.event {
1018                Event::ActionRequested { action_id, .. } => requested_id = Some(action_id.clone()),
1019                Event::ActionSucceeded { .. } => success_count += 1,
1020                Event::ActionFailed { action_id, .. } => {
1021                    failed_count += 1;
1022                    assert_eq!(
1023                        requested_id.as_deref(),
1024                        Some(action_id.as_str()),
1025                        "failed event must match requested action_id"
1026                    );
1027                }
1028                _ => {}
1029            }
1030        }
1031        assert_eq!(success_count, 0);
1032        assert_eq!(
1033            failed_count, 1,
1034            "only one terminal failure event is expected"
1035        );
1036    }
1037
1038    #[test]
1039    fn retry_then_success_has_single_terminal_success_event() {
1040        let store = Arc::new(InMemoryEventStore::new());
1041        let run_id = "run-retry-success".to_string();
1042        let exec = Arc::new(ScriptedActionExecutor::new(vec![
1043            Err(KernelError::Executor(ActionError::transient("transient-1"))),
1044            Err(KernelError::Executor(ActionError::transient("transient-2"))),
1045            Ok(ActionResult::Success(serde_json::json!("ok"))),
1046        ]));
1047        let k = Kernel::<TestState> {
1048            events: Box::new(SharedEventStore(Arc::clone(&store))),
1049            snaps: None,
1050            reducer: Box::new(StateUpdatedOnlyReducer),
1051            exec: Box::new(ArcExecutor(Arc::clone(&exec))),
1052            step: Box::new(DoThenCompleteStep::new()),
1053            policy: Box::new(RetryWithBackoffPolicy::new(AllowAllPolicy, 3, 0)),
1054            effect_sink: None,
1055            mode: KernelMode::Normal,
1056        };
1057
1058        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
1059        assert!(matches!(status, RunStatus::Completed));
1060        assert_eq!(exec.calls(), 3, "executor should be called for retries");
1061
1062        let events = store.scan(&run_id, 1).unwrap();
1063        let requested_count = events
1064            .iter()
1065            .filter(|e| matches!(e.event, Event::ActionRequested { .. }))
1066            .count();
1067        let success_count = events
1068            .iter()
1069            .filter(|e| matches!(e.event, Event::ActionSucceeded { .. }))
1070            .count();
1071        let failed_count = events
1072            .iter()
1073            .filter(|e| matches!(e.event, Event::ActionFailed { .. }))
1074            .count();
1075        assert_eq!(
1076            requested_count, 1,
1077            "retry must not duplicate ActionRequested"
1078        );
1079        assert_eq!(
1080            success_count, 1,
1081            "exactly one terminal success event expected"
1082        );
1083        assert_eq!(failed_count, 0, "success path must not emit ActionFailed");
1084    }
1085
1086    #[test]
1087    fn retry_exhausted_has_single_terminal_failed_event() {
1088        let store = Arc::new(InMemoryEventStore::new());
1089        let run_id = "run-retry-fail".to_string();
1090        let exec = Arc::new(ScriptedActionExecutor::new(vec![
1091            Err(KernelError::Executor(ActionError::transient("transient-1"))),
1092            Err(KernelError::Executor(ActionError::transient("transient-2"))),
1093            Err(KernelError::Executor(ActionError::transient("transient-3"))),
1094        ]));
1095        let k = Kernel::<TestState> {
1096            events: Box::new(SharedEventStore(Arc::clone(&store))),
1097            snaps: None,
1098            reducer: Box::new(StateUpdatedOnlyReducer),
1099            exec: Box::new(ArcExecutor(Arc::clone(&exec))),
1100            step: Box::new(DoThenCompleteStep::new()),
1101            policy: Box::new(RetryWithBackoffPolicy::new(AllowAllPolicy, 1, 0)),
1102            effect_sink: None,
1103            mode: KernelMode::Normal,
1104        };
1105
1106        let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
1107        assert!(
1108            matches!(status, RunStatus::Failed { recoverable } if !recoverable),
1109            "exhausted retries should fail run"
1110        );
1111        assert_eq!(exec.calls(), 2, "max_retries=1 should execute twice");
1112
1113        let events = store.scan(&run_id, 1).unwrap();
1114        let requested_count = events
1115            .iter()
1116            .filter(|e| matches!(e.event, Event::ActionRequested { .. }))
1117            .count();
1118        let success_count = events
1119            .iter()
1120            .filter(|e| matches!(e.event, Event::ActionSucceeded { .. }))
1121            .count();
1122        let failed_count = events
1123            .iter()
1124            .filter(|e| matches!(e.event, Event::ActionFailed { .. }))
1125            .count();
1126        assert_eq!(
1127            requested_count, 1,
1128            "retry must not duplicate ActionRequested"
1129        );
1130        assert_eq!(success_count, 0);
1131        assert_eq!(
1132            failed_count, 1,
1133            "exactly one terminal failed event expected"
1134        );
1135    }
1136
1137    struct ArcExecutor(Arc<ScriptedActionExecutor>);
1138    impl ActionExecutor for ArcExecutor {
1139        fn execute(&self, run_id: &RunId, action: &Action) -> Result<ActionResult, KernelError> {
1140            self.0.execute(run_id, action)
1141        }
1142    }
1143}