1use std::time::Duration;
4
5use crate::kernel::action::{Action, ActionError, ActionExecutor, ActionResult};
6use crate::kernel::determinism_guard::DeterminismGuard;
7use crate::kernel::event::{Event, EventStore};
8use crate::kernel::identity::{RunId, Seq};
9use crate::kernel::kernel_mode::KernelMode;
10use crate::kernel::policy::{Policy, PolicyCtx, RetryDecision};
11use crate::kernel::reducer::Reducer;
12use crate::kernel::runtime_effect::{EffectSink, RuntimeEffect};
13use crate::kernel::snapshot::{Snapshot, SnapshotStore};
14use crate::kernel::state::KernelState;
15use crate::kernel::step::{InterruptInfo, Next, StepFn};
16use crate::kernel::timeline;
17use crate::kernel::KernelError;
18
19#[derive(Clone, Debug)]
21pub enum RunStatus {
22 Completed,
24 Blocked(BlockedInfo),
26 Running,
28 Failed { recoverable: bool },
30}
31
32#[derive(Clone, Debug)]
33pub struct BlockedInfo {
34 pub interrupt: Option<InterruptInfo>,
35 pub wait_signal: Option<String>,
36}
37
38#[derive(Clone, Debug)]
40pub enum Signal {
41 Resume(serde_json::Value),
42 Signal {
43 name: String,
44 value: serde_json::Value,
45 },
46}
47
48pub struct Kernel<S: KernelState> {
50 pub events: Box<dyn EventStore>,
51 pub snaps: Option<Box<dyn SnapshotStore<S>>>,
52 pub reducer: Box<dyn Reducer<S>>,
53 pub exec: Box<dyn ActionExecutor>,
54 pub step: Box<dyn StepFn<S>>,
55 pub policy: Box<dyn Policy>,
56 pub effect_sink: Option<Box<dyn EffectSink>>,
58 pub mode: KernelMode,
60}
61
62impl<S: KernelState> Kernel<S> {
63 pub fn determinism_guard(&self) -> DeterminismGuard {
65 DeterminismGuard::new(self.mode)
66 }
67
68 pub fn run_until_blocked(
71 &self,
72 run_id: &RunId,
73 initial_state: S,
74 ) -> Result<RunStatus, KernelError> {
75 self.run_loop(run_id, initial_state)
76 }
77
78 pub fn resume(
81 &self,
82 run_id: &RunId,
83 initial_state: S,
84 signal: Signal,
85 ) -> Result<RunStatus, KernelError> {
86 let value = match &signal {
87 Signal::Resume(v) => v.clone(),
88 Signal::Signal { value, .. } => value.clone(),
89 };
90 self.events.append(run_id, &[Event::Resumed { value }])?;
91 self.run_loop(run_id, initial_state)
92 }
93
94 fn run_loop(&self, run_id: &RunId, initial_state: S) -> Result<RunStatus, KernelError> {
96 const FROM_SEQ: Seq = 1;
97 let mut state = initial_state;
98 let sequenced = self.events.scan(run_id, FROM_SEQ)?;
99 for se in sequenced {
100 self.reducer.apply(&mut state, &se)?;
101 }
102
103 loop {
104 let next = self.step.next(&state)?;
105 match next {
106 Next::Emit(evs) => {
107 if let Some(sink) = &self.effect_sink {
108 for ev in &evs {
109 if let Event::StateUpdated { step_id, payload } = ev {
110 sink.record(
111 run_id,
112 &RuntimeEffect::StateWrite {
113 step_id: step_id.clone(),
114 payload: payload.clone(),
115 },
116 );
117 }
118 }
119 }
120 if !evs.is_empty() {
121 let before = self.events.head(run_id)?;
122 self.events.append(run_id, &evs)?;
123 let new_events = self.events.scan(run_id, before + 1)?;
124 for se in new_events {
125 self.reducer.apply(&mut state, &se)?;
126 }
127 }
128 }
129 Next::Do(action) => {
130 if let Some(sink) = &self.effect_sink {
131 match &action {
132 Action::CallLLM { provider, input } => {
133 sink.record(
134 run_id,
135 &RuntimeEffect::LLMCall {
136 provider: provider.clone(),
137 input: input.clone(),
138 },
139 );
140 }
141 Action::CallTool { tool, input } => {
142 sink.record(
143 run_id,
144 &RuntimeEffect::ToolCall {
145 tool: tool.clone(),
146 input: input.clone(),
147 },
148 );
149 }
150 _ => {}
151 }
152 }
153 self.policy
154 .authorize(run_id, &action, &PolicyCtx::default())?;
155 let before = self.events.head(run_id)?;
156 let action_id = format!("{}-{}", run_id, before + 1);
157 let payload = serde_json::to_value(&action)
158 .map_err(|e| KernelError::Driver(e.to_string()))?;
159 self.events.append(
160 run_id,
161 &[Event::ActionRequested {
162 action_id: action_id.clone(),
163 payload,
164 }],
165 )?;
166 let result = self.exec.execute(run_id, &action);
167 match result {
168 Ok(ActionResult::Success(output)) => {
169 self.events.append(
170 run_id,
171 &[Event::ActionSucceeded {
172 action_id: action_id.clone(),
173 output,
174 }],
175 )?;
176 }
177 Ok(ActionResult::Failure(error)) => {
178 self.events
179 .append(run_id, &[Event::ActionFailed { action_id, error }])?;
180 return Ok(RunStatus::Failed { recoverable: false });
181 }
182 Err(mut e) => {
183 let mut attempt = 0u32;
184 loop {
185 let action_err = ActionError::from_kernel_error(&e);
186 let decision = self.policy.retry_strategy_attempt(
187 &action_err,
188 &action,
189 attempt,
190 );
191 match decision {
192 RetryDecision::Fail => {
193 self.events.append(
194 run_id,
195 &[Event::ActionFailed {
196 action_id: action_id.clone(),
197 error: e.to_string(),
198 }],
199 )?;
200 return Ok(RunStatus::Failed { recoverable: false });
201 }
202 RetryDecision::Retry | RetryDecision::RetryAfterMs(0) => {}
203 RetryDecision::RetryAfterMs(ms) => {
204 std::thread::sleep(Duration::from_millis(ms));
205 }
206 }
207 attempt += 1;
208 match self.exec.execute(run_id, &action) {
209 Ok(ActionResult::Success(output)) => {
210 self.events.append(
211 run_id,
212 &[Event::ActionSucceeded {
213 action_id: action_id.clone(),
214 output,
215 }],
216 )?;
217 break;
218 }
219 Ok(ActionResult::Failure(error)) => {
220 self.events.append(
221 run_id,
222 &[Event::ActionFailed {
223 action_id: action_id.clone(),
224 error,
225 }],
226 )?;
227 return Ok(RunStatus::Failed { recoverable: false });
228 }
229 Err(e2) => e = e2,
230 }
231 }
232 }
233 }
234 let new_events = self.events.scan(run_id, before + 1)?;
235 for se in new_events {
236 self.reducer.apply(&mut state, &se)?;
237 }
238 }
239 Next::Interrupt(info) => {
240 if let Some(sink) = &self.effect_sink {
241 sink.record(
242 run_id,
243 &RuntimeEffect::InterruptRaise {
244 value: info.value.clone(),
245 },
246 );
247 }
248 self.events.append(
249 run_id,
250 &[Event::Interrupted {
251 value: info.value.clone(),
252 }],
253 )?;
254 return Ok(RunStatus::Blocked(BlockedInfo {
255 interrupt: Some(info),
256 wait_signal: None,
257 }));
258 }
259 Next::Complete => {
260 self.events.append(run_id, &[Event::Completed])?;
261 return Ok(RunStatus::Completed);
262 }
263 }
264 }
265 }
266
267 pub fn replay(&self, run_id: &RunId, initial_state: S) -> Result<S, KernelError> {
273 self.replay_from(run_id, initial_state, None)
274 }
275
276 pub fn replay_from_snapshot(&self, run_id: &RunId, initial_state: S) -> Result<S, KernelError> {
282 let from_snap = self
283 .snaps
284 .as_ref()
285 .and_then(|s| s.load_latest(run_id).ok().flatten());
286 self.replay_from(run_id, initial_state, from_snap.as_ref())
287 }
288
289 fn replay_from(
291 &self,
292 run_id: &RunId,
293 initial_state: S,
294 snapshot: Option<&Snapshot<S>>,
295 ) -> Result<S, KernelError> {
296 const FROM_SEQ: Seq = 1;
297 let (mut state, from_seq) = match snapshot {
298 Some(snap) => (snap.state.clone(), snap.at_seq + 1),
299 None => (initial_state, FROM_SEQ),
300 };
301 let sequenced = self.events.scan(run_id, from_seq)?;
302 for se in sequenced {
303 self.reducer.apply(&mut state, &se)?;
304 }
305 Ok(state)
306 }
307
308 pub fn run_timeline(&self, run_id: &RunId) -> Result<timeline::RunTimeline, KernelError> {
310 timeline::run_timeline(self.events.as_ref(), run_id)
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317 use crate::kernel::action::{Action, ActionError, ActionExecutor, ActionResult};
318 use crate::kernel::event::Event;
319 use crate::kernel::event_store::{InMemoryEventStore, SharedEventStore};
320 use crate::kernel::policy::RetryWithBackoffPolicy;
321 use crate::kernel::runtime_effect::RuntimeEffect;
322 use crate::kernel::stubs::{AllowAllPolicy, NoopActionExecutor, NoopStepFn};
323 use crate::kernel::StateUpdatedOnlyReducer;
324 use serde::{Deserialize, Serialize};
325 use std::sync::atomic::{AtomicUsize, Ordering};
326 use std::sync::{Arc, Mutex};
327
328 #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
329 struct TestState(u32);
330 impl KernelState for TestState {
331 fn version(&self) -> u32 {
332 1
333 }
334 }
335
336 struct CountingActionExecutor(Arc<AtomicUsize>);
338 impl CountingActionExecutor {
339 fn new(counter: Arc<AtomicUsize>) -> Self {
340 Self(counter)
341 }
342 fn count(&self) -> usize {
343 self.0.load(Ordering::SeqCst)
344 }
345 }
346 impl ActionExecutor for CountingActionExecutor {
347 fn execute(&self, _run_id: &RunId, _action: &Action) -> Result<ActionResult, KernelError> {
348 self.0.fetch_add(1, Ordering::SeqCst);
349 Err(KernelError::Driver("mock".into()))
350 }
351 }
352
353 struct CollectingEffectSink(Arc<Mutex<Vec<(String, RuntimeEffect)>>>);
355 impl crate::kernel::runtime_effect::EffectSink for CollectingEffectSink {
356 fn record(&self, run_id: &RunId, effect: &RuntimeEffect) {
357 self.0
358 .lock()
359 .unwrap()
360 .push((run_id.clone(), effect.clone()));
361 }
362 }
363
364 struct EmitOnceThenCompleteStep(AtomicUsize);
366 impl StepFn<TestState> for EmitOnceThenCompleteStep {
367 fn next(&self, _state: &TestState) -> Result<Next, KernelError> {
368 if self.0.fetch_add(1, Ordering::SeqCst) == 0 {
369 Ok(Next::Emit(vec![Event::StateUpdated {
370 step_id: Some("node1".into()),
371 payload: serde_json::to_value(&TestState(1)).unwrap(),
372 }]))
373 } else {
374 Ok(Next::Complete)
375 }
376 }
377 }
378
379 #[test]
380 fn run_until_blocked_complete() {
381 let k = Kernel::<TestState> {
382 events: Box::new(InMemoryEventStore::new()),
383 snaps: None,
384 reducer: Box::new(StateUpdatedOnlyReducer),
385 exec: Box::new(NoopActionExecutor),
386 step: Box::new(NoopStepFn),
387 policy: Box::new(AllowAllPolicy),
388 effect_sink: None,
389 mode: KernelMode::Normal,
390 };
391 let run_id = "run-complete".to_string();
392 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
393 assert!(matches!(status, RunStatus::Completed));
394 }
395
396 #[test]
397 fn kernel_replay_mode_determinism_guard_traps_clock() {
398 let k = Kernel::<TestState> {
399 events: Box::new(InMemoryEventStore::new()),
400 snaps: None,
401 reducer: Box::new(StateUpdatedOnlyReducer),
402 exec: Box::new(NoopActionExecutor),
403 step: Box::new(NoopStepFn),
404 policy: Box::new(AllowAllPolicy),
405 effect_sink: None,
406 mode: KernelMode::Replay,
407 };
408 let guard = k.determinism_guard();
409 let err = guard.check_clock_access().unwrap_err();
410 assert!(err.to_string().contains("Replay"));
411 }
412
413 #[test]
414 fn effect_sink_captures_state_write_and_complete() {
415 let effects: Arc<Mutex<Vec<(String, RuntimeEffect)>>> = Arc::new(Mutex::new(Vec::new()));
416 let sink = CollectingEffectSink(Arc::clone(&effects));
417 let k = Kernel::<TestState> {
418 events: Box::new(InMemoryEventStore::new()),
419 snaps: None,
420 reducer: Box::new(StateUpdatedOnlyReducer),
421 exec: Box::new(NoopActionExecutor),
422 step: Box::new(EmitOnceThenCompleteStep(AtomicUsize::new(0))),
423 policy: Box::new(AllowAllPolicy),
424 effect_sink: Some(Box::new(sink)),
425 mode: KernelMode::Normal,
426 };
427 let run_id = "run-effect-capture".to_string();
428 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
429 assert!(matches!(status, RunStatus::Completed));
430 let recorded = effects.lock().unwrap();
431 assert_eq!(
432 recorded.len(),
433 1,
434 "one effect (StateWrite) should be captured"
435 );
436 match &recorded[0].1 {
437 RuntimeEffect::StateWrite { step_id, payload } => {
438 assert_eq!(step_id.as_deref(), Some("node1"));
439 let s: TestState = serde_json::from_value(payload.clone()).unwrap();
440 assert_eq!(s.0, 1);
441 }
442 _ => panic!("expected StateWrite"),
443 }
444 }
445
446 #[test]
447 fn run_timeline_after_complete_has_events_and_json() {
448 let store = InMemoryEventStore::new();
449 let k = Kernel::<TestState> {
450 events: Box::new(store),
451 snaps: None,
452 reducer: Box::new(StateUpdatedOnlyReducer),
453 exec: Box::new(NoopActionExecutor),
454 step: Box::new(NoopStepFn),
455 policy: Box::new(AllowAllPolicy),
456 effect_sink: None,
457 mode: KernelMode::Normal,
458 };
459 let run_id = "timeline-run".to_string();
460 let _ = k.run_until_blocked(&run_id, TestState(0)).unwrap();
461 let tl = k.run_timeline(&run_id).unwrap();
462 assert_eq!(tl.run_id, run_id);
463 let kinds: Vec<&str> = tl.events.iter().map(|e| e.kind.as_str()).collect();
464 assert!(
465 kinds.contains(&"Completed"),
466 "timeline should contain Completed"
467 );
468 let json = serde_json::to_string(&tl).unwrap();
469 assert!(json.contains("Completed"));
470 }
471
472 struct InterruptOnceStep(bool);
473 impl StepFn<TestState> for InterruptOnceStep {
474 fn next(&self, _state: &TestState) -> Result<Next, KernelError> {
475 if !self.0 {
476 Ok(Next::Interrupt(InterruptInfo {
477 value: serde_json::json!("pause"),
478 }))
479 } else {
480 Ok(Next::Complete)
481 }
482 }
483 }
484
485 #[test]
486 fn run_until_blocked_then_resume() {
487 let inner = Arc::new(InMemoryEventStore::new());
488 let events = Box::new(SharedEventStore(inner.clone()));
489 let run_id = "run-interrupt-resume".to_string();
490 let k = Kernel::<TestState> {
491 events,
492 snaps: None,
493 reducer: Box::new(StateUpdatedOnlyReducer),
494 exec: Box::new(NoopActionExecutor),
495 step: Box::new(InterruptOnceStep(false)),
496 policy: Box::new(AllowAllPolicy),
497 effect_sink: None,
498 mode: KernelMode::Normal,
499 };
500 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
501 assert!(matches!(status, RunStatus::Blocked(_)));
502
503 let k2 = Kernel::<TestState> {
504 events: Box::new(SharedEventStore(inner)),
505 snaps: None,
506 reducer: Box::new(StateUpdatedOnlyReducer),
507 exec: Box::new(NoopActionExecutor),
508 step: Box::new(InterruptOnceStep(true)),
509 policy: Box::new(AllowAllPolicy),
510 effect_sink: None,
511 mode: KernelMode::Normal,
512 };
513 let status2 = k2
514 .resume(&run_id, TestState(0), Signal::Resume(serde_json::json!(1)))
515 .unwrap();
516 assert!(matches!(status2, RunStatus::Completed));
517 }
518
519 #[test]
521 fn replay_no_side_effects() {
522 let exec_count = Arc::new(AtomicUsize::new(0));
523 let store = InMemoryEventStore::new();
524 let run_id = "replay-no-effects".to_string();
525 store
526 .append(
527 &run_id,
528 &[
529 Event::StateUpdated {
530 step_id: Some("n1".into()),
531 payload: serde_json::json!(42),
532 },
533 Event::Completed,
534 ],
535 )
536 .unwrap();
537 let k = Kernel::<TestState> {
538 events: Box::new(store),
539 snaps: None,
540 reducer: Box::new(StateUpdatedOnlyReducer),
541 exec: Box::new(CountingActionExecutor::new(Arc::clone(&exec_count))),
542 step: Box::new(NoopStepFn),
543 policy: Box::new(AllowAllPolicy),
544 effect_sink: None,
545 mode: KernelMode::Normal,
546 };
547 let _ = k.replay(&run_id, TestState(0)).unwrap();
548 assert_eq!(
549 exec_count.load(Ordering::SeqCst),
550 0,
551 "replay must not call executor"
552 );
553 }
554
555 #[test]
557 fn replay_state_equivalence() {
558 let store = InMemoryEventStore::new();
559 let run_id = "replay-equiv".to_string();
560 store
561 .append(
562 &run_id,
563 &[
564 Event::StateUpdated {
565 step_id: Some("a".into()),
566 payload: serde_json::json!(10),
567 },
568 Event::StateUpdated {
569 step_id: Some("b".into()),
570 payload: serde_json::json!(20),
571 },
572 Event::Completed,
573 ],
574 )
575 .unwrap();
576 let k = Kernel::<TestState> {
577 events: Box::new(SharedEventStore(Arc::new(store))),
578 snaps: None,
579 reducer: Box::new(StateUpdatedOnlyReducer),
580 exec: Box::new(NoopActionExecutor),
581 step: Box::new(NoopStepFn),
582 policy: Box::new(AllowAllPolicy),
583 effect_sink: None,
584 mode: KernelMode::Normal,
585 };
586 let initial = TestState(0);
587 let s1 = k.replay(&run_id, initial.clone()).unwrap();
588 let s2 = k.replay(&run_id, initial).unwrap();
589 assert_eq!(s1, s2, "same log and initial state must yield equal state");
590 assert_eq!(s1.0, 20);
591 }
592
593 struct TransientThenSuccessExecutor {
595 fail_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
596 fail_up_to: usize,
597 }
598 impl TransientThenSuccessExecutor {
599 fn new(fail_up_to: usize) -> Self {
600 Self {
601 fail_count: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)),
602 fail_up_to,
603 }
604 }
605 }
606 impl ActionExecutor for TransientThenSuccessExecutor {
607 fn execute(&self, _run_id: &RunId, _action: &Action) -> Result<ActionResult, KernelError> {
608 let n = self
609 .fail_count
610 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
611 if n < self.fail_up_to {
612 Err(KernelError::Executor(ActionError::transient("transient")))
613 } else {
614 Ok(ActionResult::Success(serde_json::json!("ok")))
615 }
616 }
617 }
618
619 #[test]
620 fn run_until_blocked_retry_then_success() {
621 use crate::kernel::policy::RetryWithBackoffPolicy;
622 let store = Arc::new(InMemoryEventStore::new());
623 let run_id = "run-retry-ok".to_string();
624 let k = Kernel::<TestState> {
625 events: Box::new(SharedEventStore(Arc::clone(&store))),
626 snaps: None,
627 reducer: Box::new(StateUpdatedOnlyReducer),
628 exec: Box::new(TransientThenSuccessExecutor::new(2)),
629 step: Box::new(DoOnceThenCompleteStep::new()),
630 policy: Box::new(RetryWithBackoffPolicy::new(AllowAllPolicy, 3, 0)),
631 effect_sink: None,
632 mode: KernelMode::Normal,
633 };
634 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
635 assert!(matches!(status, RunStatus::Completed));
636 let events = store.scan(&run_id, 1).unwrap();
637 let succeeded = events
638 .iter()
639 .filter(|e| matches!(e.event, Event::ActionSucceeded { .. }))
640 .count();
641 let failed = events
642 .iter()
643 .filter(|e| matches!(e.event, Event::ActionFailed { .. }))
644 .count();
645 assert_eq!(succeeded, 1, "exactly one ActionSucceeded after retries");
646 assert_eq!(failed, 0, "no ActionFailed when retries eventually succeed");
647 }
648
649 #[test]
651 fn run_until_blocked_failure_recovery() {
652 let store = Arc::new(InMemoryEventStore::new());
653 let run_id = "run-fail".to_string();
654 let k = Kernel::<TestState> {
655 events: Box::new(SharedEventStore(Arc::clone(&store))),
656 snaps: None,
657 reducer: Box::new(StateUpdatedOnlyReducer),
658 exec: Box::new(CountingActionExecutor::new(Arc::new(AtomicUsize::new(0)))),
659 step: Box::new(DoOnceThenCompleteStep::new()),
660 policy: Box::new(AllowAllPolicy),
661 effect_sink: None,
662 mode: KernelMode::Normal,
663 };
664 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
665 assert!(
666 matches!(status, RunStatus::Failed { recoverable } if !recoverable),
667 "default policy says Fail so recoverable should be false"
668 );
669 let events = store.scan(&run_id, 1).unwrap();
670 let has_requested = events
671 .iter()
672 .any(|e| matches!(e.event, Event::ActionRequested { .. }));
673 let has_failed = events
674 .iter()
675 .any(|e| matches!(e.event, Event::ActionFailed { .. }));
676 assert!(
677 has_requested && has_failed,
678 "event log must contain ActionRequested and ActionFailed for the same action"
679 );
680 }
681
682 struct DoOnceThenCompleteStep {
684 called: std::sync::Arc<std::sync::atomic::AtomicUsize>,
685 }
686 impl DoOnceThenCompleteStep {
687 fn new() -> Self {
688 Self {
689 called: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)),
690 }
691 }
692 }
693 impl StepFn<TestState> for DoOnceThenCompleteStep {
694 fn next(&self, _state: &TestState) -> Result<Next, KernelError> {
695 let n = self
696 .called
697 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
698 if n == 0 {
699 Ok(Next::Do(Action::CallTool {
700 tool: "dummy".into(),
701 input: serde_json::json!(null),
702 }))
703 } else {
704 Ok(Next::Complete)
705 }
706 }
707 }
708
709 struct DoThenCompleteStep {
711 called: AtomicUsize,
712 }
713 impl DoThenCompleteStep {
714 fn new() -> Self {
715 Self {
716 called: AtomicUsize::new(0),
717 }
718 }
719 }
720 impl StepFn<TestState> for DoThenCompleteStep {
721 fn next(&self, _state: &TestState) -> Result<Next, KernelError> {
722 let prev = self.called.fetch_add(1, Ordering::SeqCst);
723 if prev == 0 {
724 Ok(Next::Do(Action::CallTool {
725 tool: "dummy".into(),
726 input: serde_json::json!(null),
727 }))
728 } else {
729 Ok(Next::Complete)
730 }
731 }
732 }
733
734 struct ScriptedActionExecutor {
736 responses: Mutex<Vec<Result<ActionResult, KernelError>>>,
737 calls: AtomicUsize,
738 }
739 impl ScriptedActionExecutor {
740 fn new(responses: Vec<Result<ActionResult, KernelError>>) -> Self {
741 Self {
742 responses: Mutex::new(responses),
743 calls: AtomicUsize::new(0),
744 }
745 }
746 fn calls(&self) -> usize {
747 self.calls.load(Ordering::SeqCst)
748 }
749 }
750 impl ActionExecutor for ScriptedActionExecutor {
751 fn execute(&self, _run_id: &RunId, _action: &Action) -> Result<ActionResult, KernelError> {
752 self.calls.fetch_add(1, Ordering::SeqCst);
753 let mut guard = self.responses.lock().unwrap();
754 if guard.is_empty() {
755 return Err(KernelError::Driver("missing scripted response".into()));
756 }
757 guard.remove(0)
758 }
759 }
760
761 #[test]
763 fn replay_from_snapshot_applies_tail_only() {
764 use crate::kernel::InMemorySnapshotStore;
765 use crate::kernel::Snapshot;
766
767 let store = InMemoryEventStore::new();
768 let run_id = "replay-snap".to_string();
769 store
770 .append(
771 &run_id,
772 &[
773 Event::StateUpdated {
774 step_id: Some("a".into()),
775 payload: serde_json::json!(10),
776 },
777 Event::StateUpdated {
778 step_id: Some("b".into()),
779 payload: serde_json::json!(20),
780 },
781 Event::StateUpdated {
782 step_id: Some("c".into()),
783 payload: serde_json::json!(30),
784 },
785 Event::Completed,
786 ],
787 )
788 .unwrap();
789 let snaps = InMemorySnapshotStore::new();
790 snaps
791 .save(&Snapshot {
792 run_id: run_id.clone(),
793 at_seq: 2,
794 state: TestState(20),
795 })
796 .unwrap();
797 let k = Kernel::<TestState> {
798 events: Box::new(store),
799 snaps: Some(Box::new(snaps)),
800 reducer: Box::new(StateUpdatedOnlyReducer),
801 exec: Box::new(NoopActionExecutor),
802 step: Box::new(NoopStepFn),
803 policy: Box::new(AllowAllPolicy),
804 effect_sink: None,
805 mode: KernelMode::Normal,
806 };
807 let state = k.replay_from_snapshot(&run_id, TestState(0)).unwrap();
808 assert_eq!(state.0, 30, "only events after at_seq=2 (seq 3) applied");
809 }
810
811 #[test]
812 fn action_result_failure_returns_failed_and_single_terminal_event() {
813 let store = Arc::new(InMemoryEventStore::new());
814 let run_id = "run-action-failure".to_string();
815 let k = Kernel::<TestState> {
816 events: Box::new(SharedEventStore(Arc::clone(&store))),
817 snaps: None,
818 reducer: Box::new(StateUpdatedOnlyReducer),
819 exec: Box::new(ScriptedActionExecutor::new(vec![Ok(
820 ActionResult::Failure("boom".into()),
821 )])),
822 step: Box::new(DoThenCompleteStep::new()),
823 policy: Box::new(AllowAllPolicy),
824 effect_sink: None,
825 mode: KernelMode::Normal,
826 };
827
828 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
829 assert!(
830 matches!(status, RunStatus::Failed { recoverable } if !recoverable),
831 "ActionResult::Failure must fail the run"
832 );
833
834 let events = store.scan(&run_id, 1).unwrap();
835 let mut requested_id: Option<String> = None;
836 let mut success_count = 0usize;
837 let mut failed_count = 0usize;
838 for e in &events {
839 match &e.event {
840 Event::ActionRequested { action_id, .. } => requested_id = Some(action_id.clone()),
841 Event::ActionSucceeded { .. } => success_count += 1,
842 Event::ActionFailed { action_id, .. } => {
843 failed_count += 1;
844 assert_eq!(
845 requested_id.as_deref(),
846 Some(action_id.as_str()),
847 "failed event must match requested action_id"
848 );
849 }
850 _ => {}
851 }
852 }
853 assert_eq!(success_count, 0);
854 assert_eq!(
855 failed_count, 1,
856 "only one terminal failure event is expected"
857 );
858 }
859
860 #[test]
861 fn retry_then_success_has_single_terminal_success_event() {
862 let store = Arc::new(InMemoryEventStore::new());
863 let run_id = "run-retry-success".to_string();
864 let exec = Arc::new(ScriptedActionExecutor::new(vec![
865 Err(KernelError::Executor(ActionError::transient("transient-1"))),
866 Err(KernelError::Executor(ActionError::transient("transient-2"))),
867 Ok(ActionResult::Success(serde_json::json!("ok"))),
868 ]));
869 let k = Kernel::<TestState> {
870 events: Box::new(SharedEventStore(Arc::clone(&store))),
871 snaps: None,
872 reducer: Box::new(StateUpdatedOnlyReducer),
873 exec: Box::new(ArcExecutor(Arc::clone(&exec))),
874 step: Box::new(DoThenCompleteStep::new()),
875 policy: Box::new(RetryWithBackoffPolicy::new(AllowAllPolicy, 3, 0)),
876 effect_sink: None,
877 mode: KernelMode::Normal,
878 };
879
880 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
881 assert!(matches!(status, RunStatus::Completed));
882 assert_eq!(exec.calls(), 3, "executor should be called for retries");
883
884 let events = store.scan(&run_id, 1).unwrap();
885 let requested_count = events
886 .iter()
887 .filter(|e| matches!(e.event, Event::ActionRequested { .. }))
888 .count();
889 let success_count = events
890 .iter()
891 .filter(|e| matches!(e.event, Event::ActionSucceeded { .. }))
892 .count();
893 let failed_count = events
894 .iter()
895 .filter(|e| matches!(e.event, Event::ActionFailed { .. }))
896 .count();
897 assert_eq!(
898 requested_count, 1,
899 "retry must not duplicate ActionRequested"
900 );
901 assert_eq!(
902 success_count, 1,
903 "exactly one terminal success event expected"
904 );
905 assert_eq!(failed_count, 0, "success path must not emit ActionFailed");
906 }
907
908 #[test]
909 fn retry_exhausted_has_single_terminal_failed_event() {
910 let store = Arc::new(InMemoryEventStore::new());
911 let run_id = "run-retry-fail".to_string();
912 let exec = Arc::new(ScriptedActionExecutor::new(vec![
913 Err(KernelError::Executor(ActionError::transient("transient-1"))),
914 Err(KernelError::Executor(ActionError::transient("transient-2"))),
915 Err(KernelError::Executor(ActionError::transient("transient-3"))),
916 ]));
917 let k = Kernel::<TestState> {
918 events: Box::new(SharedEventStore(Arc::clone(&store))),
919 snaps: None,
920 reducer: Box::new(StateUpdatedOnlyReducer),
921 exec: Box::new(ArcExecutor(Arc::clone(&exec))),
922 step: Box::new(DoThenCompleteStep::new()),
923 policy: Box::new(RetryWithBackoffPolicy::new(AllowAllPolicy, 1, 0)),
924 effect_sink: None,
925 mode: KernelMode::Normal,
926 };
927
928 let status = k.run_until_blocked(&run_id, TestState(0)).unwrap();
929 assert!(
930 matches!(status, RunStatus::Failed { recoverable } if !recoverable),
931 "exhausted retries should fail run"
932 );
933 assert_eq!(exec.calls(), 2, "max_retries=1 should execute twice");
934
935 let events = store.scan(&run_id, 1).unwrap();
936 let requested_count = events
937 .iter()
938 .filter(|e| matches!(e.event, Event::ActionRequested { .. }))
939 .count();
940 let success_count = events
941 .iter()
942 .filter(|e| matches!(e.event, Event::ActionSucceeded { .. }))
943 .count();
944 let failed_count = events
945 .iter()
946 .filter(|e| matches!(e.event, Event::ActionFailed { .. }))
947 .count();
948 assert_eq!(
949 requested_count, 1,
950 "retry must not duplicate ActionRequested"
951 );
952 assert_eq!(success_count, 0);
953 assert_eq!(
954 failed_count, 1,
955 "exactly one terminal failed event expected"
956 );
957 }
958
959 struct ArcExecutor(Arc<ScriptedActionExecutor>);
960 impl ActionExecutor for ArcExecutor {
961 fn execute(&self, run_id: &RunId, action: &Action) -> Result<ActionResult, KernelError> {
962 self.0.execute(run_id, action)
963 }
964 }
965}