1use std::collections::{HashMap, HashSet, VecDeque};
7use std::sync::Arc;
8use std::time::Duration;
9
10use async_trait::async_trait;
11use tokio::sync::Mutex;
12use tracing::{debug, info, instrument, warn};
13
14use crate::attempt_envelope::{analyze_kernel_events, OrphanAttempt};
15use crate::config::{BackoffPolicy, ExecutionMode, RunConfig, RunManifest};
16use crate::context_runtime::write_full_snapshot_value;
17use crate::engine::{ExecutionEngine, RunPhase, RunResult, StartRun, Stores};
18use crate::errors::{ContextError, ErrorCategory, ErrorInfo, RunError, StorageError};
19use crate::events::{Event, EventEnvelope, KernelEvent, RunStatus};
20use crate::hashing::artifact_id_for_json;
21use crate::ids::{ArtifactId, ErrorCode, OpId, RunId, StateId};
22use crate::live_io::{FactIndex, LiveIoTransportFactory, UnimplementedLiveIoTransportFactory};
23use crate::plan::{DependencyEdge, ExecutionPlan, PlanValidationError, StateNode};
24use crate::stores::ArtifactStore;
25
26mod attempt;
27mod child_runs;
28mod writer;
29
30use writer::{append_kernel, EventWriter, SharedEventWriter};
31
32pub use child_runs::ChildRunLiveIoTransportFactory;
33
34const CODE_UNSUPPORTED_EXECUTION_MODE: &str = "unsupported_execution_mode";
35
36pub trait PlanResolver: Send + Sync {
41 fn resolve(&self, manifest: &RunManifest) -> Result<ExecutionPlan, RunError>;
43}
44
45#[derive(Clone, Default)]
47pub struct EngineFailpoints {
48 pub stop_after_handler_once: Arc<std::sync::atomic::AtomicBool>,
52}
53
54impl EngineFailpoints {
55 fn should_stop_after_handler(&self) -> bool {
56 self.stop_after_handler_once
57 .swap(false, std::sync::atomic::Ordering::SeqCst)
58 }
59}
60
61#[derive(Clone)]
66pub struct DefaultExecutionEngine {
67 resolver: Arc<dyn PlanResolver>,
68 live_transport_factory: Arc<dyn LiveIoTransportFactory>,
69 failpoints: Option<EngineFailpoints>,
70}
71
72impl DefaultExecutionEngine {
73 pub fn new(resolver: Arc<dyn PlanResolver>) -> Self {
75 Self {
76 resolver,
77 live_transport_factory: Arc::new(UnimplementedLiveIoTransportFactory),
78 failpoints: None,
79 }
80 }
81
82 pub fn with_live_transport_factory(mut self, factory: Arc<dyn LiveIoTransportFactory>) -> Self {
84 self.live_transport_factory = factory;
85 self
86 }
87
88 pub fn with_failpoints(mut self, failpoints: EngineFailpoints) -> Self {
90 self.failpoints = Some(failpoints);
91 self
92 }
93}
94
95fn info(code: &'static str, category: ErrorCategory, message: &'static str) -> ErrorInfo {
96 ErrorInfo {
97 code: ErrorCode(code.to_string()),
98 category,
99 retryable: false,
100 message: message.to_string(),
101 details: None,
102 }
103}
104
105fn invalid_plan(code: &'static str, message: &'static str) -> RunError {
106 RunError::InvalidPlan(info(code, ErrorCategory::Unknown, message))
107}
108
109fn storage_not_found(code: &'static str, message: &'static str) -> StorageError {
110 StorageError::NotFound(info(code, ErrorCategory::Storage, message))
111}
112
113fn context_err(code: &'static str, message: &'static str) -> ContextError {
114 ContextError::Serialization(info(code, ErrorCategory::Context, message))
115}
116
117fn compute_backoff(policy: &BackoffPolicy, attempt: u32) -> Duration {
118 match policy {
119 BackoffPolicy::Fixed { delay } => *delay,
120 BackoffPolicy::Exponential {
121 base_delay,
122 max_delay,
123 } => {
124 let shift = attempt.min(31);
125 let factor = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
126 let scaled = base_delay.saturating_mul(factor);
127 if &scaled > max_delay {
128 *max_delay
129 } else {
130 scaled
131 }
132 }
133 }
134}
135
136fn validate_execution_mode(cfg: &RunConfig) -> Result<(), RunError> {
137 match cfg.execution_mode {
138 ExecutionMode::Sequential => Ok(()),
139 ExecutionMode::FanOutJoin { .. } => Err(RunError::InvalidPlan(info(
140 CODE_UNSUPPORTED_EXECUTION_MODE,
141 ErrorCategory::Unknown,
142 "execution_mode FanOutJoin is not supported",
143 ))),
144 }
145}
146
147fn validate_start_run_contract(run: &StartRun) -> Result<(), RunError> {
148 let value = serde_json::to_value(&run.manifest).map_err(|_| {
149 invalid_plan(
150 "manifest_serialize_failed",
151 "failed to serialize run manifest",
152 )
153 })?;
154 let computed = artifact_id_for_json(&value).map_err(|e| match e {
155 crate::hashing::CanonicalJsonError::FloatNotAllowed => invalid_plan(
156 "manifest_not_canonical",
157 "run manifest is not canonical-json-hashable (floats are forbidden)",
158 ),
159 crate::hashing::CanonicalJsonError::SecretsNotAllowed => invalid_plan(
160 "secrets_detected",
161 "run manifest contained secrets (policy forbids persisting secrets)",
162 ),
163 })?;
164 if computed != run.manifest_id {
165 return Err(invalid_plan(
166 "manifest_id_mismatch",
167 "manifest_id did not match canonical JSON hash of the manifest",
168 ));
169 }
170
171 if run.manifest.op_id != run.plan.op_id {
172 return Err(invalid_plan(
173 "manifest_op_id_mismatch",
174 "manifest.op_id did not match plan.op_id",
175 ));
176 }
177
178 if run.manifest.run_config != run.run_config {
179 return Err(invalid_plan(
180 "run_config_mismatch",
181 "run_config did not match manifest.run_config",
182 ));
183 }
184
185 Ok(())
186}
187
188fn topological_order(plan: &ExecutionPlan) -> Result<Vec<StateNode>, PlanValidationError> {
189 if plan.graph.states.is_empty() {
190 return Err(PlanValidationError::EmptyPlan);
191 }
192
193 let mut nodes_by_id: HashMap<StateId, StateNode> = HashMap::new();
194 for n in &plan.graph.states {
195 if nodes_by_id.contains_key(&n.id) {
196 return Err(PlanValidationError::DuplicateStateId {
197 state_id: n.id.clone(),
198 });
199 }
200 nodes_by_id.insert(n.id.clone(), n.clone());
201 }
202
203 let mut indegree: HashMap<StateId, usize> = HashMap::new();
204 let mut edges_from: HashMap<StateId, Vec<StateId>> = HashMap::new();
205 for id in nodes_by_id.keys() {
206 indegree.insert(id.clone(), 0);
207 edges_from.insert(id.clone(), Vec::new());
208 }
209
210 for DependencyEdge { from, to } in &plan.graph.edges {
211 if !nodes_by_id.contains_key(from) {
212 return Err(PlanValidationError::MissingStateForEdge {
213 missing: from.clone(),
214 });
215 }
216 if !nodes_by_id.contains_key(to) {
217 return Err(PlanValidationError::MissingStateForEdge {
218 missing: to.clone(),
219 });
220 }
221 edges_from.get_mut(from).unwrap().push(to.clone());
222 *indegree.get_mut(to).unwrap() += 1;
223 }
224
225 let mut queue = VecDeque::new();
226 for n in &plan.graph.states {
227 if indegree.get(&n.id).copied().unwrap_or(0) == 0 {
228 queue.push_back(n.id.clone());
229 }
230 }
231
232 let mut out = Vec::with_capacity(nodes_by_id.len());
233 while let Some(id) = queue.pop_front() {
234 let node = nodes_by_id.get(&id).unwrap().clone();
235 out.push(node);
236 for to in edges_from.get(&id).unwrap() {
237 let entry = indegree.get_mut(to).unwrap();
238 *entry -= 1;
239 if *entry == 0 {
240 queue.push_back(to.clone());
241 }
242 }
243 }
244
245 if out.len() != nodes_by_id.len() {
246 let remaining: Vec<StateId> = indegree
247 .into_iter()
248 .filter_map(|(id, deg)| if deg > 0 { Some(id) } else { None })
249 .collect();
250 return Err(PlanValidationError::CircularDependency { cycle: remaining });
251 }
252
253 Ok(out)
254}
255
256#[derive(Clone, Debug)]
257struct RunStartedInfo {
258 op_id: OpId,
259 manifest_id: ArtifactId,
260 initial_snapshot_id: ArtifactId,
261}
262
263#[derive(Clone, Debug)]
264struct RunHistory {
265 started: RunStartedInfo,
266 completed_states: HashSet<StateId>,
267 last_checkpoint: ArtifactId,
268 orphan_attempt: Option<OrphanAttempt>,
269 last_failure_by_state: HashMap<StateId, (u32, ArtifactId, bool)>, last_attempt_by_state: HashMap<StateId, u32>,
271 run_completed: Option<(RunStatus, Option<ArtifactId>)>,
272}
273
274fn read_run_history(run_id: RunId, stream: &[EventEnvelope]) -> Result<RunHistory, RunError> {
275 let analysis = analyze_kernel_events(stream).map_err(|_| {
276 invalid_plan(
277 "invalid_attempt_envelopes",
278 "invalid attempt envelopes in event stream",
279 )
280 })?;
281
282 let mut started: Option<RunStartedInfo> = None;
283 let mut completed_states = HashSet::new();
284 let mut last_checkpoint: Option<ArtifactId> = None;
285 let mut open_attempt: Option<(StateId, u32, ArtifactId)> = None;
286 let mut last_failure_by_state: HashMap<StateId, (u32, ArtifactId, bool)> = HashMap::new();
287 let mut last_attempt_by_state: HashMap<StateId, u32> = HashMap::new();
288 let mut run_completed: Option<(RunStatus, Option<ArtifactId>)> = None;
289
290 for e in stream {
291 if e.run_id != run_id {
292 return Err(invalid_plan(
293 "run_id_mismatch",
294 "event stream run_id mismatch",
295 ));
296 }
297
298 match &e.event {
299 Event::Kernel(ke) => match ke {
300 KernelEvent::RunStarted {
301 op_id,
302 manifest_id,
303 initial_snapshot_id,
304 } => {
305 if started.is_none() {
306 started = Some(RunStartedInfo {
307 op_id: op_id.clone(),
308 manifest_id: manifest_id.clone(),
309 initial_snapshot_id: initial_snapshot_id.clone(),
310 });
311 last_checkpoint = Some(initial_snapshot_id.clone());
312 }
313 }
314 KernelEvent::StateEntered {
315 state_id,
316 attempt,
317 base_snapshot_id,
318 } => {
319 open_attempt = Some((state_id.clone(), *attempt, base_snapshot_id.clone()));
320 last_attempt_by_state.insert(state_id.clone(), *attempt);
321 }
322 KernelEvent::StateCompleted {
323 state_id,
324 context_snapshot_id,
325 } => {
326 completed_states.insert(state_id.clone());
327 last_checkpoint = Some(context_snapshot_id.clone());
328 open_attempt = None;
329 }
330 KernelEvent::StateFailed {
331 state_id, error, ..
332 } => {
333 let Some((_, attempt, base_snapshot)) = open_attempt.take() else {
334 return Err(invalid_plan(
335 "terminal_without_entered",
336 "state terminal without StateEntered",
337 ));
338 };
339 last_failure_by_state.insert(
340 state_id.clone(),
341 (attempt, base_snapshot, error.info.retryable),
342 );
343 }
344 KernelEvent::RunCompleted {
345 status,
346 final_snapshot_id,
347 } => {
348 run_completed = Some((status.clone(), final_snapshot_id.clone()));
349 }
350 },
351 Event::Domain(_) => {}
352 }
353 }
354
355 let Some(started) = started else {
356 return Err(invalid_plan(
357 "missing_run_started",
358 "missing RunStarted kernel event",
359 ));
360 };
361
362 let last_checkpoint = last_checkpoint.unwrap_or_else(|| started.initial_snapshot_id.clone());
363
364 Ok(RunHistory {
365 started,
366 completed_states,
367 last_checkpoint,
368 orphan_attempt: analysis.orphan_attempt,
369 last_failure_by_state,
370 last_attempt_by_state,
371 run_completed,
372 })
373}
374
375async fn read_manifest(
376 artifacts: &dyn ArtifactStore,
377 manifest_id: &ArtifactId,
378) -> Result<RunManifest, RunError> {
379 let bytes = artifacts
380 .get(manifest_id)
381 .await
382 .map_err(RunError::Storage)?;
383 let value = serde_json::from_slice::<serde_json::Value>(&bytes).map_err(|_| {
384 RunError::Context(context_err(
385 "manifest_decode_failed",
386 "failed to decode manifest JSON",
387 ))
388 })?;
389
390 let computed = crate::hashing::artifact_id_for_bytes(&bytes);
392 if &computed != manifest_id {
393 return Err(invalid_plan(
394 "manifest_corrupt",
395 "manifest artifact content hash mismatch",
396 ));
397 }
398
399 serde_json::from_value::<RunManifest>(value).map_err(|_| {
400 RunError::Context(context_err(
401 "manifest_deserialize_failed",
402 "failed to deserialize manifest",
403 ))
404 })
405}
406
407fn next_attempt(last_attempt_by_state: &HashMap<StateId, u32>, state_id: &StateId) -> u32 {
408 last_attempt_by_state
409 .get(state_id)
410 .copied()
411 .map(|a| a + 1)
412 .unwrap_or(0)
413}
414
415#[allow(clippy::too_many_arguments)]
416#[instrument(
417 level = "info",
418 skip(
419 stores,
420 plan,
421 run_config,
422 writer,
423 completed_states,
424 start_at_state,
425 facts,
426 live_factory,
427 failpoints
428 ),
429 fields(run_id = %run_id.0, op_id = %plan.op_id)
430)]
431async fn run_states(
432 stores: &Stores,
433 plan: &ExecutionPlan,
434 run_config: &RunConfig,
435 run_id: RunId,
436 writer: SharedEventWriter,
437 mut current_snapshot_id: ArtifactId,
438 completed_states: &HashSet<StateId>,
439 start_at_state: Option<(StateId, u32, ArtifactId)>,
440 facts: FactIndex,
441 live_factory: Arc<dyn LiveIoTransportFactory>,
442 failpoints: Option<EngineFailpoints>,
443) -> Result<RunResult, RunError> {
444 validate_execution_mode(run_config)?;
445 debug!(execution_mode = ?run_config.execution_mode, "running execution plan");
446
447 let ordered = topological_order(plan)
448 .map_err(|_| invalid_plan("invalid_plan", "execution plan failed validation"))?;
449 debug!(
450 state_count = ordered.len(),
451 "execution plan resolved to topological order"
452 );
453
454 let mut found_start = start_at_state.is_none();
455 let mut phase = RunPhase::Running;
456
457 for node in ordered {
458 if completed_states.contains(&node.id) {
459 debug!(state_id = %node.id, "state already completed, skipping");
460 continue;
461 }
462
463 let (state_id, mut attempt, base_snapshot_id) =
464 if let Some((sid, att, base)) = &start_at_state {
465 if !found_start {
466 if &node.id != sid {
467 continue;
468 }
469 found_start = true;
470 }
471 if &node.id == sid {
472 (sid.clone(), *att, base.clone())
473 } else {
474 (node.id.clone(), 0, current_snapshot_id.clone())
475 }
476 } else {
477 (node.id.clone(), 0, current_snapshot_id.clone())
478 };
479
480 let state = Arc::clone(&node.state);
481 let state_meta = state.meta();
482 info!(state_id = %state_id, attempt, "starting state execution");
483
484 loop {
485 let mut attempt_ctx = attempt::AttemptCtx::new(
486 stores,
487 run_config,
488 run_id,
489 state_id.clone(),
490 attempt,
491 base_snapshot_id.clone(),
492 facts.clone(),
493 Arc::clone(&writer),
494 Arc::clone(&live_factory),
495 failpoints.clone(),
496 Arc::clone(&state),
497 state_meta.clone(),
498 );
499
500 match attempt::execute_attempt(&mut attempt_ctx).await? {
501 attempt::AttemptExec::Completed { snapshot_id } => {
502 current_snapshot_id = snapshot_id;
503 info!(
504 state_id = %state_id,
505 attempt,
506 snapshot_id = %current_snapshot_id.0,
507 "state execution completed"
508 );
509 break;
510 }
511 attempt::AttemptExec::StopAfterHandler => {
512 warn!(
513 state_id = %state_id,
514 attempt,
515 "execution stopped after handler due to failpoint"
516 );
517 return Ok(RunResult {
518 run_id,
519 phase: RunPhase::Running,
520 final_snapshot_id: Some(current_snapshot_id.clone()),
521 });
522 }
523 attempt::AttemptExec::Failed { retryable } => {
524 let next = attempt + 1;
525 if retryable && next < run_config.retry_policy.max_attempts {
526 let d = compute_backoff(&run_config.retry_policy.backoff, attempt);
527 warn!(
528 state_id = %state_id,
529 attempt,
530 next_attempt = next,
531 backoff_ms = d.as_millis() as u64,
532 "state failed and will be retried"
533 );
534 if !d.is_zero() {
535 tokio::time::sleep(d).await;
536 }
537 attempt = next;
538 continue;
539 }
540
541 phase = RunPhase::Failed;
542 warn!(
543 state_id = %state_id,
544 attempt,
545 retryable,
546 max_attempts = run_config.retry_policy.max_attempts,
547 "state failed and no retries remain"
548 );
549 break;
550 }
551 }
552 }
553
554 if phase == RunPhase::Failed {
555 break;
556 }
557 }
558
559 let (status, final_snapshot_id) = match phase {
560 RunPhase::Running | RunPhase::Completed => {
561 (RunStatus::Completed, Some(current_snapshot_id.clone()))
562 }
563 RunPhase::Failed => (RunStatus::Failed, Some(current_snapshot_id.clone())),
564 RunPhase::Cancelled => (RunStatus::Cancelled, Some(current_snapshot_id.clone())),
565 };
566
567 append_kernel(
568 &writer,
569 KernelEvent::RunCompleted {
570 status: status.clone(),
571 final_snapshot_id: final_snapshot_id.clone(),
572 },
573 )
574 .await?;
575 info!(
576 phase = ?phase,
577 final_snapshot_id = final_snapshot_id.as_ref().map(|id| id.0.as_str()),
578 "run completed and finalized"
579 );
580
581 Ok(RunResult {
582 run_id,
583 phase: match status {
584 RunStatus::Completed => RunPhase::Completed,
585 RunStatus::Failed => RunPhase::Failed,
586 RunStatus::Cancelled => RunPhase::Cancelled,
587 },
588 final_snapshot_id,
589 })
590}
591
592#[async_trait]
593impl ExecutionEngine for DefaultExecutionEngine {
594 #[instrument(level = "info", skip(self, stores, run), fields(op_id = %run.plan.op_id))]
595 async fn start(&self, stores: Stores, run: StartRun) -> Result<RunResult, RunError> {
596 validate_execution_mode(&run.run_config)?;
597 validate_start_run_contract(&run)?;
598
599 let exists = stores
600 .artifacts
601 .exists(&run.manifest_id)
602 .await
603 .map_err(RunError::Storage)?;
604 if !exists {
605 return Err(RunError::Storage(storage_not_found(
606 "manifest_not_found",
607 "manifest artifact was not found",
608 )));
609 }
610 info!(manifest_id = %run.manifest_id.0, "starting run");
611
612 let run_id = RunId(uuid::Uuid::new_v4());
613
614 let initial_snapshot = run.initial_context.dump().map_err(RunError::Context)?;
616 let initial_snapshot_id =
617 write_full_snapshot_value(stores.artifacts.as_ref(), initial_snapshot).await?;
618
619 let writer: SharedEventWriter = Arc::new(Mutex::new(
620 EventWriter::new(Arc::clone(&stores.events), run_id)
621 .await
622 .map_err(RunError::Storage)?,
623 ));
624
625 writer
626 .lock()
627 .await
628 .append_kernel(KernelEvent::RunStarted {
629 op_id: run.plan.op_id.clone(),
630 manifest_id: run.manifest_id.clone(),
631 initial_snapshot_id: initial_snapshot_id.clone(),
632 })
633 .await
634 .map_err(RunError::Storage)?;
635 info!(
636 run_id = %run_id.0,
637 initial_snapshot_id = %initial_snapshot_id.0,
638 "run started event appended"
639 );
640
641 let completed_states = HashSet::new();
642 let current_snapshot_id = initial_snapshot_id.clone();
643 let facts = FactIndex::default();
644
645 run_states(
646 &stores,
647 &run.plan,
648 &run.run_config,
649 run_id,
650 writer,
651 current_snapshot_id,
652 &completed_states,
653 None,
654 facts,
655 Arc::clone(&self.live_transport_factory),
656 self.failpoints.clone(),
657 )
658 .await
659 }
660
661 #[instrument(level = "info", skip(self, stores), fields(run_id = %run_id.0))]
662 async fn resume(&self, stores: Stores, run_id: RunId) -> Result<RunResult, RunError> {
663 let head = stores
664 .events
665 .head_seq(run_id)
666 .await
667 .map_err(RunError::Storage)?;
668 if head == 0 {
669 return Err(RunError::Storage(storage_not_found(
670 "run_not_found",
671 "run event stream was not found",
672 )));
673 }
674 debug!(head_seq = head, "resuming run from event stream");
675
676 let stream = stores
677 .events
678 .read_range(run_id, 1, None)
679 .await
680 .map_err(RunError::Storage)?;
681
682 let facts = FactIndex::from_event_stream(&stream);
683 let history = read_run_history(run_id, &stream)?;
684 debug!(
685 completed_state_count = history.completed_states.len(),
686 "loaded run history for resume"
687 );
688
689 if let Some((status, final_snapshot_id)) = &history.run_completed {
690 info!(
691 status = ?status,
692 final_snapshot_id = final_snapshot_id.as_ref().map(|id| id.0.as_str()),
693 "run already completed; resume returns existing terminal state"
694 );
695 return Ok(RunResult {
696 run_id,
697 phase: match status {
698 RunStatus::Completed => RunPhase::Completed,
699 RunStatus::Failed => RunPhase::Failed,
700 RunStatus::Cancelled => RunPhase::Cancelled,
701 },
702 final_snapshot_id: final_snapshot_id.clone(),
703 });
704 }
705
706 let manifest =
707 read_manifest(stores.artifacts.as_ref(), &history.started.manifest_id).await?;
708 validate_execution_mode(&manifest.run_config)?;
709
710 if history.started.op_id != manifest.op_id {
711 return Err(invalid_plan(
712 "run_started_op_id_mismatch",
713 "RunStarted.op_id did not match manifest.op_id",
714 ));
715 }
716 debug!(op_id = %manifest.op_id, "manifest loaded for resume");
717
718 let plan = self.resolver.resolve(&manifest)?;
719 if plan.op_id != manifest.op_id {
720 return Err(invalid_plan(
721 "plan_op_id_mismatch",
722 "resolved plan.op_id did not match manifest.op_id",
723 ));
724 }
725
726 let writer: SharedEventWriter = Arc::new(Mutex::new(
727 EventWriter::new(Arc::clone(&stores.events), run_id)
728 .await
729 .map_err(RunError::Storage)?,
730 ));
731
732 if let Some(orphan) = &history.orphan_attempt {
734 warn!(
735 state_id = %orphan.state_id,
736 previous_attempt = orphan.attempt,
737 "retrying orphan attempt from base snapshot"
738 );
739 let start = (
740 orphan.state_id.clone(),
741 orphan.attempt + 1,
742 orphan.base_snapshot_id.clone(),
743 );
744 return run_states(
745 &stores,
746 &plan,
747 &manifest.run_config,
748 run_id,
749 writer,
750 history.last_checkpoint.clone(),
751 &history.completed_states,
752 Some(start),
753 facts.clone(),
754 Arc::clone(&self.live_transport_factory),
755 self.failpoints.clone(),
756 )
757 .await;
758 }
759
760 let ordered = topological_order(&plan)
762 .map_err(|_| invalid_plan("invalid_plan", "execution plan failed validation"))?;
763 let next_state = ordered
764 .iter()
765 .find(|n| !history.completed_states.contains(&n.id))
766 .map(|n| n.id.clone());
767
768 let Some(next_state_id) = next_state else {
769 info!("all states already completed; finalizing run");
770 writer
771 .lock()
772 .await
773 .append_kernel(KernelEvent::RunCompleted {
774 status: RunStatus::Completed,
775 final_snapshot_id: Some(history.last_checkpoint.clone()),
776 })
777 .await
778 .map_err(RunError::Storage)?;
779 return Ok(RunResult {
780 run_id,
781 phase: RunPhase::Completed,
782 final_snapshot_id: Some(history.last_checkpoint.clone()),
783 });
784 };
785
786 if let Some((attempt, base_snapshot, retryable)) =
788 history.last_failure_by_state.get(&next_state_id)
789 {
790 let next = attempt + 1;
791 if !*retryable || next >= manifest.run_config.retry_policy.max_attempts {
792 warn!(
793 state_id = %next_state_id,
794 attempt = *attempt,
795 retryable = *retryable,
796 max_attempts = manifest.run_config.retry_policy.max_attempts,
797 "resume cannot retry failed state; finalizing run as failed"
798 );
799 writer
800 .lock()
801 .await
802 .append_kernel(KernelEvent::RunCompleted {
803 status: RunStatus::Failed,
804 final_snapshot_id: Some(history.last_checkpoint.clone()),
805 })
806 .await
807 .map_err(RunError::Storage)?;
808 return Ok(RunResult {
809 run_id,
810 phase: RunPhase::Failed,
811 final_snapshot_id: Some(history.last_checkpoint.clone()),
812 });
813 }
814
815 info!(
816 state_id = %next_state_id,
817 next_attempt = next,
818 base_snapshot_id = %base_snapshot.0,
819 "resuming from failed state with retry"
820 );
821 let start = (next_state_id.clone(), next, base_snapshot.clone());
822 return run_states(
823 &stores,
824 &plan,
825 &manifest.run_config,
826 run_id,
827 writer,
828 history.last_checkpoint.clone(),
829 &history.completed_states,
830 Some(start),
831 facts.clone(),
832 Arc::clone(&self.live_transport_factory),
833 self.failpoints.clone(),
834 )
835 .await;
836 }
837
838 let start = (
839 next_state_id.clone(),
840 next_attempt(&history.last_attempt_by_state, &next_state_id),
841 history.last_checkpoint.clone(),
842 );
843 info!(
844 state_id = %next_state_id,
845 attempt = start.1,
846 base_snapshot_id = %history.last_checkpoint.0,
847 "resuming run at next state"
848 );
849 run_states(
850 &stores,
851 &plan,
852 &manifest.run_config,
853 run_id,
854 writer,
855 history.last_checkpoint.clone(),
856 &history.completed_states,
857 Some(start),
858 facts,
859 Arc::clone(&self.live_transport_factory),
860 self.failpoints.clone(),
861 )
862 .await
863 }
864}
865
866#[cfg(test)]
867#[path = "tests/runtime_tests.rs"]
868mod runtime_tests;