Skip to main content

mfm_machine/
runtime.rs

1//! Unstable v4 runtime implementation (executor + resume logic).
2//!
3//! Source of truth: `docs/redesign.md` (v4).
4//! Not part of the stable API contract (Appendix C.1).
5
6use std::collections::{HashMap, HashSet, VecDeque};
7use std::sync::Arc;
8use std::time::Duration;
9
10use async_trait::async_trait;
11use tokio::sync::Mutex;
12use tracing::{debug, info, instrument, warn};
13
14use crate::attempt_envelope::{analyze_kernel_events, OrphanAttempt};
15use crate::config::{BackoffPolicy, ExecutionMode, RunConfig, RunManifest};
16use crate::context_runtime::write_full_snapshot_value;
17use crate::engine::{ExecutionEngine, RunPhase, RunResult, StartRun, Stores};
18use crate::errors::{ContextError, ErrorCategory, ErrorInfo, RunError, StorageError};
19use crate::events::{Event, EventEnvelope, KernelEvent, RunStatus};
20use crate::hashing::artifact_id_for_json;
21use crate::ids::{ArtifactId, ErrorCode, OpId, RunId, StateId};
22use crate::live_io::{FactIndex, LiveIoTransportFactory, UnimplementedLiveIoTransportFactory};
23use crate::plan::{DependencyEdge, ExecutionPlan, PlanValidationError, StateNode};
24use crate::stores::ArtifactStore;
25
26mod attempt;
27mod child_runs;
28mod writer;
29
30use writer::{append_kernel, EventWriter, SharedEventWriter};
31
32pub use child_runs::ChildRunLiveIoTransportFactory;
33
34const CODE_UNSUPPORTED_EXECUTION_MODE: &str = "unsupported_execution_mode";
35
36/// Resolves an executable plan from a validated run manifest.
37///
38/// Runtime entry points use this trait to recreate a plan on `start` and `resume`
39/// without hard-coding domain planners into the engine.
40pub trait PlanResolver: Send + Sync {
41    /// Builds the execution plan for `manifest`.
42    fn resolve(&self, manifest: &RunManifest) -> Result<ExecutionPlan, RunError>;
43}
44
45/// Test-only hooks that let integration tests interrupt engine progress at known points.
46#[derive(Clone, Default)]
47pub struct EngineFailpoints {
48    /// Stops the next state attempt after the handler returns but before the engine advances.
49    ///
50    /// The flag is single-use: once observed, it is reset to `false`.
51    pub stop_after_handler_once: Arc<std::sync::atomic::AtomicBool>,
52}
53
54impl EngineFailpoints {
55    fn should_stop_after_handler(&self) -> bool {
56        self.stop_after_handler_once
57            .swap(false, std::sync::atomic::Ordering::SeqCst)
58    }
59}
60
61/// Default runtime implementation for the unstable v4 execution engine.
62///
63/// The engine validates the run contract, rehydrates facts from prior events, and
64/// executes states in topological order with retry/resume semantics.
65#[derive(Clone)]
66pub struct DefaultExecutionEngine {
67    resolver: Arc<dyn PlanResolver>,
68    live_transport_factory: Arc<dyn LiveIoTransportFactory>,
69    failpoints: Option<EngineFailpoints>,
70}
71
72impl DefaultExecutionEngine {
73    /// Creates an engine that resolves plans with `resolver` and rejects live IO by default.
74    pub fn new(resolver: Arc<dyn PlanResolver>) -> Self {
75        Self {
76            resolver,
77            live_transport_factory: Arc::new(UnimplementedLiveIoTransportFactory),
78            failpoints: None,
79        }
80    }
81
82    /// Installs the live IO transport factory used for state execution in live mode.
83    pub fn with_live_transport_factory(mut self, factory: Arc<dyn LiveIoTransportFactory>) -> Self {
84        self.live_transport_factory = factory;
85        self
86    }
87
88    /// Enables test failpoints for the returned engine instance.
89    pub fn with_failpoints(mut self, failpoints: EngineFailpoints) -> Self {
90        self.failpoints = Some(failpoints);
91        self
92    }
93}
94
95fn info(code: &'static str, category: ErrorCategory, message: &'static str) -> ErrorInfo {
96    ErrorInfo {
97        code: ErrorCode(code.to_string()),
98        category,
99        retryable: false,
100        message: message.to_string(),
101        details: None,
102    }
103}
104
105fn invalid_plan(code: &'static str, message: &'static str) -> RunError {
106    RunError::InvalidPlan(info(code, ErrorCategory::Unknown, message))
107}
108
109fn storage_not_found(code: &'static str, message: &'static str) -> StorageError {
110    StorageError::NotFound(info(code, ErrorCategory::Storage, message))
111}
112
113fn context_err(code: &'static str, message: &'static str) -> ContextError {
114    ContextError::Serialization(info(code, ErrorCategory::Context, message))
115}
116
117fn compute_backoff(policy: &BackoffPolicy, attempt: u32) -> Duration {
118    match policy {
119        BackoffPolicy::Fixed { delay } => *delay,
120        BackoffPolicy::Exponential {
121            base_delay,
122            max_delay,
123        } => {
124            let shift = attempt.min(31);
125            let factor = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
126            let scaled = base_delay.saturating_mul(factor);
127            if &scaled > max_delay {
128                *max_delay
129            } else {
130                scaled
131            }
132        }
133    }
134}
135
136fn validate_execution_mode(cfg: &RunConfig) -> Result<(), RunError> {
137    match cfg.execution_mode {
138        ExecutionMode::Sequential => Ok(()),
139        ExecutionMode::FanOutJoin { .. } => Err(RunError::InvalidPlan(info(
140            CODE_UNSUPPORTED_EXECUTION_MODE,
141            ErrorCategory::Unknown,
142            "execution_mode FanOutJoin is not supported",
143        ))),
144    }
145}
146
147fn validate_start_run_contract(run: &StartRun) -> Result<(), RunError> {
148    let value = serde_json::to_value(&run.manifest).map_err(|_| {
149        invalid_plan(
150            "manifest_serialize_failed",
151            "failed to serialize run manifest",
152        )
153    })?;
154    let computed = artifact_id_for_json(&value).map_err(|e| match e {
155        crate::hashing::CanonicalJsonError::FloatNotAllowed => invalid_plan(
156            "manifest_not_canonical",
157            "run manifest is not canonical-json-hashable (floats are forbidden)",
158        ),
159        crate::hashing::CanonicalJsonError::SecretsNotAllowed => invalid_plan(
160            "secrets_detected",
161            "run manifest contained secrets (policy forbids persisting secrets)",
162        ),
163    })?;
164    if computed != run.manifest_id {
165        return Err(invalid_plan(
166            "manifest_id_mismatch",
167            "manifest_id did not match canonical JSON hash of the manifest",
168        ));
169    }
170
171    if run.manifest.op_id != run.plan.op_id {
172        return Err(invalid_plan(
173            "manifest_op_id_mismatch",
174            "manifest.op_id did not match plan.op_id",
175        ));
176    }
177
178    if run.manifest.run_config != run.run_config {
179        return Err(invalid_plan(
180            "run_config_mismatch",
181            "run_config did not match manifest.run_config",
182        ));
183    }
184
185    Ok(())
186}
187
188fn topological_order(plan: &ExecutionPlan) -> Result<Vec<StateNode>, PlanValidationError> {
189    if plan.graph.states.is_empty() {
190        return Err(PlanValidationError::EmptyPlan);
191    }
192
193    let mut nodes_by_id: HashMap<StateId, StateNode> = HashMap::new();
194    for n in &plan.graph.states {
195        if nodes_by_id.contains_key(&n.id) {
196            return Err(PlanValidationError::DuplicateStateId {
197                state_id: n.id.clone(),
198            });
199        }
200        nodes_by_id.insert(n.id.clone(), n.clone());
201    }
202
203    let mut indegree: HashMap<StateId, usize> = HashMap::new();
204    let mut edges_from: HashMap<StateId, Vec<StateId>> = HashMap::new();
205    for id in nodes_by_id.keys() {
206        indegree.insert(id.clone(), 0);
207        edges_from.insert(id.clone(), Vec::new());
208    }
209
210    for DependencyEdge { from, to } in &plan.graph.edges {
211        if !nodes_by_id.contains_key(from) {
212            return Err(PlanValidationError::MissingStateForEdge {
213                missing: from.clone(),
214            });
215        }
216        if !nodes_by_id.contains_key(to) {
217            return Err(PlanValidationError::MissingStateForEdge {
218                missing: to.clone(),
219            });
220        }
221        edges_from.get_mut(from).unwrap().push(to.clone());
222        *indegree.get_mut(to).unwrap() += 1;
223    }
224
225    let mut queue = VecDeque::new();
226    for n in &plan.graph.states {
227        if indegree.get(&n.id).copied().unwrap_or(0) == 0 {
228            queue.push_back(n.id.clone());
229        }
230    }
231
232    let mut out = Vec::with_capacity(nodes_by_id.len());
233    while let Some(id) = queue.pop_front() {
234        let node = nodes_by_id.get(&id).unwrap().clone();
235        out.push(node);
236        for to in edges_from.get(&id).unwrap() {
237            let entry = indegree.get_mut(to).unwrap();
238            *entry -= 1;
239            if *entry == 0 {
240                queue.push_back(to.clone());
241            }
242        }
243    }
244
245    if out.len() != nodes_by_id.len() {
246        let remaining: Vec<StateId> = indegree
247            .into_iter()
248            .filter_map(|(id, deg)| if deg > 0 { Some(id) } else { None })
249            .collect();
250        return Err(PlanValidationError::CircularDependency { cycle: remaining });
251    }
252
253    Ok(out)
254}
255
256#[derive(Clone, Debug)]
257struct RunStartedInfo {
258    op_id: OpId,
259    manifest_id: ArtifactId,
260    initial_snapshot_id: ArtifactId,
261}
262
263#[derive(Clone, Debug)]
264struct RunHistory {
265    started: RunStartedInfo,
266    completed_states: HashSet<StateId>,
267    last_checkpoint: ArtifactId,
268    orphan_attempt: Option<OrphanAttempt>,
269    last_failure_by_state: HashMap<StateId, (u32, ArtifactId, bool)>, // (attempt, base_snapshot, retryable)
270    last_attempt_by_state: HashMap<StateId, u32>,
271    run_completed: Option<(RunStatus, Option<ArtifactId>)>,
272}
273
274fn read_run_history(run_id: RunId, stream: &[EventEnvelope]) -> Result<RunHistory, RunError> {
275    let analysis = analyze_kernel_events(stream).map_err(|_| {
276        invalid_plan(
277            "invalid_attempt_envelopes",
278            "invalid attempt envelopes in event stream",
279        )
280    })?;
281
282    let mut started: Option<RunStartedInfo> = None;
283    let mut completed_states = HashSet::new();
284    let mut last_checkpoint: Option<ArtifactId> = None;
285    let mut open_attempt: Option<(StateId, u32, ArtifactId)> = None;
286    let mut last_failure_by_state: HashMap<StateId, (u32, ArtifactId, bool)> = HashMap::new();
287    let mut last_attempt_by_state: HashMap<StateId, u32> = HashMap::new();
288    let mut run_completed: Option<(RunStatus, Option<ArtifactId>)> = None;
289
290    for e in stream {
291        if e.run_id != run_id {
292            return Err(invalid_plan(
293                "run_id_mismatch",
294                "event stream run_id mismatch",
295            ));
296        }
297
298        match &e.event {
299            Event::Kernel(ke) => match ke {
300                KernelEvent::RunStarted {
301                    op_id,
302                    manifest_id,
303                    initial_snapshot_id,
304                } => {
305                    if started.is_none() {
306                        started = Some(RunStartedInfo {
307                            op_id: op_id.clone(),
308                            manifest_id: manifest_id.clone(),
309                            initial_snapshot_id: initial_snapshot_id.clone(),
310                        });
311                        last_checkpoint = Some(initial_snapshot_id.clone());
312                    }
313                }
314                KernelEvent::StateEntered {
315                    state_id,
316                    attempt,
317                    base_snapshot_id,
318                } => {
319                    open_attempt = Some((state_id.clone(), *attempt, base_snapshot_id.clone()));
320                    last_attempt_by_state.insert(state_id.clone(), *attempt);
321                }
322                KernelEvent::StateCompleted {
323                    state_id,
324                    context_snapshot_id,
325                } => {
326                    completed_states.insert(state_id.clone());
327                    last_checkpoint = Some(context_snapshot_id.clone());
328                    open_attempt = None;
329                }
330                KernelEvent::StateFailed {
331                    state_id, error, ..
332                } => {
333                    let Some((_, attempt, base_snapshot)) = open_attempt.take() else {
334                        return Err(invalid_plan(
335                            "terminal_without_entered",
336                            "state terminal without StateEntered",
337                        ));
338                    };
339                    last_failure_by_state.insert(
340                        state_id.clone(),
341                        (attempt, base_snapshot, error.info.retryable),
342                    );
343                }
344                KernelEvent::RunCompleted {
345                    status,
346                    final_snapshot_id,
347                } => {
348                    run_completed = Some((status.clone(), final_snapshot_id.clone()));
349                }
350            },
351            Event::Domain(_) => {}
352        }
353    }
354
355    let Some(started) = started else {
356        return Err(invalid_plan(
357            "missing_run_started",
358            "missing RunStarted kernel event",
359        ));
360    };
361
362    let last_checkpoint = last_checkpoint.unwrap_or_else(|| started.initial_snapshot_id.clone());
363
364    Ok(RunHistory {
365        started,
366        completed_states,
367        last_checkpoint,
368        orphan_attempt: analysis.orphan_attempt,
369        last_failure_by_state,
370        last_attempt_by_state,
371        run_completed,
372    })
373}
374
375async fn read_manifest(
376    artifacts: &dyn ArtifactStore,
377    manifest_id: &ArtifactId,
378) -> Result<RunManifest, RunError> {
379    let bytes = artifacts
380        .get(manifest_id)
381        .await
382        .map_err(RunError::Storage)?;
383    let value = serde_json::from_slice::<serde_json::Value>(&bytes).map_err(|_| {
384        RunError::Context(context_err(
385            "manifest_decode_failed",
386            "failed to decode manifest JSON",
387        ))
388    })?;
389
390    // Defense: manifest bytes must match its content hash.
391    let computed = crate::hashing::artifact_id_for_bytes(&bytes);
392    if &computed != manifest_id {
393        return Err(invalid_plan(
394            "manifest_corrupt",
395            "manifest artifact content hash mismatch",
396        ));
397    }
398
399    serde_json::from_value::<RunManifest>(value).map_err(|_| {
400        RunError::Context(context_err(
401            "manifest_deserialize_failed",
402            "failed to deserialize manifest",
403        ))
404    })
405}
406
407fn next_attempt(last_attempt_by_state: &HashMap<StateId, u32>, state_id: &StateId) -> u32 {
408    last_attempt_by_state
409        .get(state_id)
410        .copied()
411        .map(|a| a + 1)
412        .unwrap_or(0)
413}
414
415#[allow(clippy::too_many_arguments)]
416#[instrument(
417    level = "info",
418    skip(
419        stores,
420        plan,
421        run_config,
422        writer,
423        completed_states,
424        start_at_state,
425        facts,
426        live_factory,
427        failpoints
428    ),
429    fields(run_id = %run_id.0, op_id = %plan.op_id)
430)]
431async fn run_states(
432    stores: &Stores,
433    plan: &ExecutionPlan,
434    run_config: &RunConfig,
435    run_id: RunId,
436    writer: SharedEventWriter,
437    mut current_snapshot_id: ArtifactId,
438    completed_states: &HashSet<StateId>,
439    start_at_state: Option<(StateId, u32, ArtifactId)>,
440    facts: FactIndex,
441    live_factory: Arc<dyn LiveIoTransportFactory>,
442    failpoints: Option<EngineFailpoints>,
443) -> Result<RunResult, RunError> {
444    validate_execution_mode(run_config)?;
445    debug!(execution_mode = ?run_config.execution_mode, "running execution plan");
446
447    let ordered = topological_order(plan)
448        .map_err(|_| invalid_plan("invalid_plan", "execution plan failed validation"))?;
449    debug!(
450        state_count = ordered.len(),
451        "execution plan resolved to topological order"
452    );
453
454    let mut found_start = start_at_state.is_none();
455    let mut phase = RunPhase::Running;
456
457    for node in ordered {
458        if completed_states.contains(&node.id) {
459            debug!(state_id = %node.id, "state already completed, skipping");
460            continue;
461        }
462
463        let (state_id, mut attempt, base_snapshot_id) =
464            if let Some((sid, att, base)) = &start_at_state {
465                if !found_start {
466                    if &node.id != sid {
467                        continue;
468                    }
469                    found_start = true;
470                }
471                if &node.id == sid {
472                    (sid.clone(), *att, base.clone())
473                } else {
474                    (node.id.clone(), 0, current_snapshot_id.clone())
475                }
476            } else {
477                (node.id.clone(), 0, current_snapshot_id.clone())
478            };
479
480        let state = Arc::clone(&node.state);
481        let state_meta = state.meta();
482        info!(state_id = %state_id, attempt, "starting state execution");
483
484        loop {
485            let mut attempt_ctx = attempt::AttemptCtx::new(
486                stores,
487                run_config,
488                run_id,
489                state_id.clone(),
490                attempt,
491                base_snapshot_id.clone(),
492                facts.clone(),
493                Arc::clone(&writer),
494                Arc::clone(&live_factory),
495                failpoints.clone(),
496                Arc::clone(&state),
497                state_meta.clone(),
498            );
499
500            match attempt::execute_attempt(&mut attempt_ctx).await? {
501                attempt::AttemptExec::Completed { snapshot_id } => {
502                    current_snapshot_id = snapshot_id;
503                    info!(
504                        state_id = %state_id,
505                        attempt,
506                        snapshot_id = %current_snapshot_id.0,
507                        "state execution completed"
508                    );
509                    break;
510                }
511                attempt::AttemptExec::StopAfterHandler => {
512                    warn!(
513                        state_id = %state_id,
514                        attempt,
515                        "execution stopped after handler due to failpoint"
516                    );
517                    return Ok(RunResult {
518                        run_id,
519                        phase: RunPhase::Running,
520                        final_snapshot_id: Some(current_snapshot_id.clone()),
521                    });
522                }
523                attempt::AttemptExec::Failed { retryable } => {
524                    let next = attempt + 1;
525                    if retryable && next < run_config.retry_policy.max_attempts {
526                        let d = compute_backoff(&run_config.retry_policy.backoff, attempt);
527                        warn!(
528                            state_id = %state_id,
529                            attempt,
530                            next_attempt = next,
531                            backoff_ms = d.as_millis() as u64,
532                            "state failed and will be retried"
533                        );
534                        if !d.is_zero() {
535                            tokio::time::sleep(d).await;
536                        }
537                        attempt = next;
538                        continue;
539                    }
540
541                    phase = RunPhase::Failed;
542                    warn!(
543                        state_id = %state_id,
544                        attempt,
545                        retryable,
546                        max_attempts = run_config.retry_policy.max_attempts,
547                        "state failed and no retries remain"
548                    );
549                    break;
550                }
551            }
552        }
553
554        if phase == RunPhase::Failed {
555            break;
556        }
557    }
558
559    let (status, final_snapshot_id) = match phase {
560        RunPhase::Running | RunPhase::Completed => {
561            (RunStatus::Completed, Some(current_snapshot_id.clone()))
562        }
563        RunPhase::Failed => (RunStatus::Failed, Some(current_snapshot_id.clone())),
564        RunPhase::Cancelled => (RunStatus::Cancelled, Some(current_snapshot_id.clone())),
565    };
566
567    append_kernel(
568        &writer,
569        KernelEvent::RunCompleted {
570            status: status.clone(),
571            final_snapshot_id: final_snapshot_id.clone(),
572        },
573    )
574    .await?;
575    info!(
576        phase = ?phase,
577        final_snapshot_id = final_snapshot_id.as_ref().map(|id| id.0.as_str()),
578        "run completed and finalized"
579    );
580
581    Ok(RunResult {
582        run_id,
583        phase: match status {
584            RunStatus::Completed => RunPhase::Completed,
585            RunStatus::Failed => RunPhase::Failed,
586            RunStatus::Cancelled => RunPhase::Cancelled,
587        },
588        final_snapshot_id,
589    })
590}
591
592#[async_trait]
593impl ExecutionEngine for DefaultExecutionEngine {
594    #[instrument(level = "info", skip(self, stores, run), fields(op_id = %run.plan.op_id))]
595    async fn start(&self, stores: Stores, run: StartRun) -> Result<RunResult, RunError> {
596        validate_execution_mode(&run.run_config)?;
597        validate_start_run_contract(&run)?;
598
599        let exists = stores
600            .artifacts
601            .exists(&run.manifest_id)
602            .await
603            .map_err(RunError::Storage)?;
604        if !exists {
605            return Err(RunError::Storage(storage_not_found(
606                "manifest_not_found",
607                "manifest artifact was not found",
608            )));
609        }
610        info!(manifest_id = %run.manifest_id.0, "starting run");
611
612        let run_id = RunId(uuid::Uuid::new_v4());
613
614        // Store initial context snapshot as an artifact.
615        let initial_snapshot = run.initial_context.dump().map_err(RunError::Context)?;
616        let initial_snapshot_id =
617            write_full_snapshot_value(stores.artifacts.as_ref(), initial_snapshot).await?;
618
619        let writer: SharedEventWriter = Arc::new(Mutex::new(
620            EventWriter::new(Arc::clone(&stores.events), run_id)
621                .await
622                .map_err(RunError::Storage)?,
623        ));
624
625        writer
626            .lock()
627            .await
628            .append_kernel(KernelEvent::RunStarted {
629                op_id: run.plan.op_id.clone(),
630                manifest_id: run.manifest_id.clone(),
631                initial_snapshot_id: initial_snapshot_id.clone(),
632            })
633            .await
634            .map_err(RunError::Storage)?;
635        info!(
636            run_id = %run_id.0,
637            initial_snapshot_id = %initial_snapshot_id.0,
638            "run started event appended"
639        );
640
641        let completed_states = HashSet::new();
642        let current_snapshot_id = initial_snapshot_id.clone();
643        let facts = FactIndex::default();
644
645        run_states(
646            &stores,
647            &run.plan,
648            &run.run_config,
649            run_id,
650            writer,
651            current_snapshot_id,
652            &completed_states,
653            None,
654            facts,
655            Arc::clone(&self.live_transport_factory),
656            self.failpoints.clone(),
657        )
658        .await
659    }
660
661    #[instrument(level = "info", skip(self, stores), fields(run_id = %run_id.0))]
662    async fn resume(&self, stores: Stores, run_id: RunId) -> Result<RunResult, RunError> {
663        let head = stores
664            .events
665            .head_seq(run_id)
666            .await
667            .map_err(RunError::Storage)?;
668        if head == 0 {
669            return Err(RunError::Storage(storage_not_found(
670                "run_not_found",
671                "run event stream was not found",
672            )));
673        }
674        debug!(head_seq = head, "resuming run from event stream");
675
676        let stream = stores
677            .events
678            .read_range(run_id, 1, None)
679            .await
680            .map_err(RunError::Storage)?;
681
682        let facts = FactIndex::from_event_stream(&stream);
683        let history = read_run_history(run_id, &stream)?;
684        debug!(
685            completed_state_count = history.completed_states.len(),
686            "loaded run history for resume"
687        );
688
689        if let Some((status, final_snapshot_id)) = &history.run_completed {
690            info!(
691                status = ?status,
692                final_snapshot_id = final_snapshot_id.as_ref().map(|id| id.0.as_str()),
693                "run already completed; resume returns existing terminal state"
694            );
695            return Ok(RunResult {
696                run_id,
697                phase: match status {
698                    RunStatus::Completed => RunPhase::Completed,
699                    RunStatus::Failed => RunPhase::Failed,
700                    RunStatus::Cancelled => RunPhase::Cancelled,
701                },
702                final_snapshot_id: final_snapshot_id.clone(),
703            });
704        }
705
706        let manifest =
707            read_manifest(stores.artifacts.as_ref(), &history.started.manifest_id).await?;
708        validate_execution_mode(&manifest.run_config)?;
709
710        if history.started.op_id != manifest.op_id {
711            return Err(invalid_plan(
712                "run_started_op_id_mismatch",
713                "RunStarted.op_id did not match manifest.op_id",
714            ));
715        }
716        debug!(op_id = %manifest.op_id, "manifest loaded for resume");
717
718        let plan = self.resolver.resolve(&manifest)?;
719        if plan.op_id != manifest.op_id {
720            return Err(invalid_plan(
721                "plan_op_id_mismatch",
722                "resolved plan.op_id did not match manifest.op_id",
723            ));
724        }
725
726        let writer: SharedEventWriter = Arc::new(Mutex::new(
727            EventWriter::new(Arc::clone(&stores.events), run_id)
728                .await
729                .map_err(RunError::Storage)?,
730        ));
731
732        // Orphan attempt handling: retry from base snapshot with attempt+1.
733        if let Some(orphan) = &history.orphan_attempt {
734            warn!(
735                state_id = %orphan.state_id,
736                previous_attempt = orphan.attempt,
737                "retrying orphan attempt from base snapshot"
738            );
739            let start = (
740                orphan.state_id.clone(),
741                orphan.attempt + 1,
742                orphan.base_snapshot_id.clone(),
743            );
744            return run_states(
745                &stores,
746                &plan,
747                &manifest.run_config,
748                run_id,
749                writer,
750                history.last_checkpoint.clone(),
751                &history.completed_states,
752                Some(start),
753                facts.clone(),
754                Arc::clone(&self.live_transport_factory),
755                self.failpoints.clone(),
756            )
757            .await;
758        }
759
760        // If all states are done, finalize run.
761        let ordered = topological_order(&plan)
762            .map_err(|_| invalid_plan("invalid_plan", "execution plan failed validation"))?;
763        let next_state = ordered
764            .iter()
765            .find(|n| !history.completed_states.contains(&n.id))
766            .map(|n| n.id.clone());
767
768        let Some(next_state_id) = next_state else {
769            info!("all states already completed; finalizing run");
770            writer
771                .lock()
772                .await
773                .append_kernel(KernelEvent::RunCompleted {
774                    status: RunStatus::Completed,
775                    final_snapshot_id: Some(history.last_checkpoint.clone()),
776                })
777                .await
778                .map_err(RunError::Storage)?;
779            return Ok(RunResult {
780                run_id,
781                phase: RunPhase::Completed,
782                final_snapshot_id: Some(history.last_checkpoint.clone()),
783            });
784        };
785
786        // Retry a previously failed state if retries remain; otherwise finalize failed run.
787        if let Some((attempt, base_snapshot, retryable)) =
788            history.last_failure_by_state.get(&next_state_id)
789        {
790            let next = attempt + 1;
791            if !*retryable || next >= manifest.run_config.retry_policy.max_attempts {
792                warn!(
793                    state_id = %next_state_id,
794                    attempt = *attempt,
795                    retryable = *retryable,
796                    max_attempts = manifest.run_config.retry_policy.max_attempts,
797                    "resume cannot retry failed state; finalizing run as failed"
798                );
799                writer
800                    .lock()
801                    .await
802                    .append_kernel(KernelEvent::RunCompleted {
803                        status: RunStatus::Failed,
804                        final_snapshot_id: Some(history.last_checkpoint.clone()),
805                    })
806                    .await
807                    .map_err(RunError::Storage)?;
808                return Ok(RunResult {
809                    run_id,
810                    phase: RunPhase::Failed,
811                    final_snapshot_id: Some(history.last_checkpoint.clone()),
812                });
813            }
814
815            info!(
816                state_id = %next_state_id,
817                next_attempt = next,
818                base_snapshot_id = %base_snapshot.0,
819                "resuming from failed state with retry"
820            );
821            let start = (next_state_id.clone(), next, base_snapshot.clone());
822            return run_states(
823                &stores,
824                &plan,
825                &manifest.run_config,
826                run_id,
827                writer,
828                history.last_checkpoint.clone(),
829                &history.completed_states,
830                Some(start),
831                facts.clone(),
832                Arc::clone(&self.live_transport_factory),
833                self.failpoints.clone(),
834            )
835            .await;
836        }
837
838        let start = (
839            next_state_id.clone(),
840            next_attempt(&history.last_attempt_by_state, &next_state_id),
841            history.last_checkpoint.clone(),
842        );
843        info!(
844            state_id = %next_state_id,
845            attempt = start.1,
846            base_snapshot_id = %history.last_checkpoint.0,
847            "resuming run at next state"
848        );
849        run_states(
850            &stores,
851            &plan,
852            &manifest.run_config,
853            run_id,
854            writer,
855            history.last_checkpoint.clone(),
856            &history.completed_states,
857            Some(start),
858            facts,
859            Arc::clone(&self.live_transport_factory),
860            self.failpoints.clone(),
861        )
862        .await
863    }
864}
865
866#[cfg(test)]
867#[path = "tests/runtime_tests.rs"]
868mod runtime_tests;