Skip to main content

runkon_flow/
engine.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::atomic::{AtomicI64, Ordering};
3use std::sync::{Arc, Mutex};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use crate::cancellation::CancellationToken;
7use crate::constants::FLOW_OUTPUT_INSTRUCTION;
8use crate::dsl::{InputType, OnFail, WorkflowDef, WorkflowNode};
9use crate::engine_error::{EngineError, Result};
10use crate::events::{EngineEvent, EventSink};
11use crate::extensions::{Extensions, LlmRunMetrics};
12use crate::output_schema::OutputSchema;
13use crate::status::{WorkflowRunStatus, WorkflowStepStatus};
14use crate::traits::action_executor::ActionRegistry;
15use crate::traits::item_provider::ItemProviderRegistry;
16use crate::traits::persistence::WorkflowPersistence;
17use crate::traits::run_context::RunContext;
18use crate::traits::script_env_provider::ScriptEnvProvider;
19use crate::types::{
20    ContextEntry, StepKey, StepResult, WorkflowExecConfig, WorkflowResult, WorkflowRunStep,
21};
22
23/// Pre-loaded context for resuming a workflow run.
24#[derive(Clone)]
25pub struct ResumeContext {
26    /// Completed step records keyed by (step_name, iteration), for O(1) zero-alloc lookup.
27    pub step_map: HashMap<String, HashMap<u32, WorkflowRunStep>>,
28}
29
30/// Mutable runtime state for a workflow execution — no conductor-core deps.
31#[derive(Clone)]
32pub struct ExecutionState {
33    pub persistence: Arc<dyn WorkflowPersistence>,
34    pub action_registry: Arc<ActionRegistry>,
35    pub script_env_provider: Arc<dyn ScriptEnvProvider>,
36    pub workflow_run_id: String,
37    pub workflow_name: String,
38    /// Shared per-run context carrying injected variables and working directory.
39    /// `Arc` (not `Box`) because `ExecutionState` derives `Clone` for `fork_child`.
40    pub run_ctx: Arc<dyn RunContext>,
41    /// Extra plugin directories for the executor. Not part of `RunContext`
42    /// because `Vec<String>` doesn't fit the `HashMap<&'static str, String>`
43    /// injected-variables contract, and only executor code reads it.
44    pub extra_plugin_dirs: Vec<String>,
45    pub model: Option<String>,
46    pub exec_config: WorkflowExecConfig,
47    pub inputs: HashMap<String, String>,
48    pub parent_run_id: String,
49    pub depth: u32,
50    pub target_label: Option<String>,
51    // Runtime
52    pub step_results: HashMap<String, StepResult>,
53    pub contexts: Vec<ContextEntry>,
54    pub position: i64,
55    pub all_succeeded: bool,
56    pub total_cost: f64,
57    pub total_turns: i64,
58    pub total_duration_ms: i64,
59    pub total_input_tokens: i64,
60    pub total_output_tokens: i64,
61    pub total_cache_read_input_tokens: i64,
62    pub total_cache_creation_input_tokens: i64,
63    pub has_llm_metrics: bool,
64    pub last_gate_feedback: Option<String>,
65    pub block_output: Option<String>,
66    pub block_with: Vec<String>,
67    pub resume_ctx: Option<ResumeContext>,
68    pub default_as_identity: Option<String>,
69    pub triggered_by_hook: bool,
70    /// Schema resolver callback — (schema_name) → OutputSchema.
71    /// The host closes over working_dir and repo_path at construction time.
72    #[allow(clippy::type_complexity)]
73    pub schema_resolver: Option<Arc<dyn Fn(&str) -> Result<OutputSchema> + Send + Sync>>,
74    /// Runner for child workflows (call workflow nodes).
75    pub child_runner: Option<Arc<dyn ChildWorkflowRunner>>,
76    pub last_heartbeat_at: Arc<AtomicI64>,
77    pub registry: Arc<ItemProviderRegistry>,
78    /// Event sinks — slice shared cheaply across sub-workflow states.
79    pub event_sinks: Arc<[Arc<dyn EventSink>]>,
80    /// Cancellation token for this run. Checked at each step boundary.
81    pub cancellation: CancellationToken,
82    /// The executor label and step_id of the currently executing action, if any.
83    /// Written by execute_call before dispatch; read by FlowEngine::cancel_run()
84    /// to fire-and-forget executor.cancel().
85    pub current_execution_id: Arc<Mutex<Option<(String, String)>>>,
86    /// Lease token held by this engine instance after a successful acquire_lease().
87    /// Used by PRs 3–5 for refresh and generation checks.
88    pub owner_token: Option<String>,
89    pub lease_generation: Option<i64>,
90}
91
92/// Input parameters for child workflow execution.
93pub struct ChildWorkflowInput {
94    pub inputs: HashMap<String, String>,
95    pub iteration: u32,
96    pub as_identity: Option<String>,
97    pub depth: u32,
98    pub parent_step_id: Option<String>,
99    /// Child token derived from the parent run's cancellation token.
100    /// The child runner sets this as the child `ExecutionState.cancellation`
101    /// so that cancelling the parent automatically cancels in-progress child runs.
102    pub cancellation: CancellationToken,
103}
104
105/// Subset of `ExecutionState` exposed to `ChildWorkflowRunner` implementations.
106///
107/// The full `ExecutionState` carries the engine's mutable runtime — registries,
108/// accumulators, schema resolver, position pointer, cancellation token —
109/// none of which a harness needs to spawn a child workflow run. Passing it
110/// across the trait boundary makes every `ExecutionState` field rename or
111/// restructuring a breaking change for every `ChildWorkflowRunner` implementor.
112///
113/// `ChildWorkflowContext` is the narrow, stable surface: every field listed
114/// here is something the bridge actually reads when constructing the child
115/// run. Build via [`ExecutionState::child_workflow_context`].
116#[non_exhaustive]
117#[derive(Clone)]
118pub struct ChildWorkflowContext {
119    pub run_ctx: Arc<dyn RunContext>,
120    pub extra_plugin_dirs: Vec<String>,
121    pub workflow_run_id: String,
122    pub model: Option<String>,
123    pub exec_config: WorkflowExecConfig,
124    pub inputs: HashMap<String, String>,
125    pub event_sinks: Arc<[Arc<dyn EventSink>]>,
126}
127
128impl ChildWorkflowContext {
129    pub fn new(
130        run_ctx: Arc<dyn RunContext>,
131        extra_plugin_dirs: Vec<String>,
132        workflow_run_id: String,
133        model: Option<String>,
134        exec_config: WorkflowExecConfig,
135        inputs: HashMap<String, String>,
136        event_sinks: Arc<[Arc<dyn EventSink>]>,
137    ) -> Self {
138        Self {
139            run_ctx,
140            extra_plugin_dirs,
141            workflow_run_id,
142            model,
143            exec_config,
144            inputs,
145            event_sinks,
146        }
147    }
148}
149
150/// Trait for executing child workflows — allows conductor-core to inject its adapter.
151pub trait ChildWorkflowRunner: Send + Sync {
152    fn execute_child(
153        &self,
154        workflow_name: &str,
155        parent_ctx: &ChildWorkflowContext,
156        params: ChildWorkflowInput,
157    ) -> Result<WorkflowResult>;
158
159    fn resume_child(
160        &self,
161        workflow_run_id: &str,
162        model: Option<&str>,
163        parent_ctx: &ChildWorkflowContext,
164    ) -> Result<WorkflowResult>;
165
166    fn find_resumable_child(
167        &self,
168        parent_run_id: &str,
169        workflow_name: &str,
170    ) -> Result<Option<crate::types::WorkflowRun>>;
171}
172
173impl ExecutionState {
174    /// Create a fresh heartbeat counter, initialized to 0 so the first tick fires immediately.
175    pub fn new_heartbeat() -> Arc<AtomicI64> {
176        Arc::new(AtomicI64::new(0))
177    }
178
179    /// Read the current lease generation, panicking with a consistent message
180    /// if the lease was never acquired. Every executor `update_step` call site
181    /// requires a generation, and `FlowEngine::run`/`resume` is the single
182    /// entry point that sets it — so a `None` here is a programmer error in
183    /// engine construction, not a runtime condition.
184    pub fn expect_lease_generation(&self) -> i64 {
185        self.lease_generation
186            .expect("lease_generation must be set after FlowEngine::run/resume entry")
187    }
188
189    /// Throttled heartbeat tick + external cancel check.
190    ///
191    /// Bumps `last_heartbeat` in persistence at most once every 5 seconds and
192    /// polls for cross-process cancellation via `persistence.is_run_cancelled`.
193    /// On external cancel, sets `self.cancellation` and returns
194    /// `Err(EngineError::Cancelled)`.
195    ///
196    /// Callers that own the engine main loop use `?` to propagate cancellation
197    /// up. Wait loops that need to drain in-flight work (parallel, foreach)
198    /// can call this best-effort and rely on `self.cancellation.is_cancelled()`
199    /// for their controlled exit — the cancellation token is set by this
200    /// helper before the `Err` is returned.
201    ///
202    /// Without this being called from inside long-running wait loops (parallel
203    /// blocks, foreach fan-out), the cancellation check is skipped during
204    /// multi-minute waits — see #2731.
205    ///
206    /// NOTE (#2731/#2796): lease refresh (refresh_lease_loop in flow_engine.rs)
207    /// is the load-bearing ownership mechanism. `detect_stuck_workflow_run_ids`
208    /// falls back to `started_at` when `last_heartbeat` is NULL (new runs).
209    pub fn check_cancellation_throttled(&self) -> Result<()> {
210        use crate::cancellation_reason::CancellationReason;
211
212        let now_secs = SystemTime::now()
213            .duration_since(UNIX_EPOCH)
214            .unwrap_or_else(|e| {
215                tracing::warn!("system clock regressed: {e}; cancellation check suppressed");
216                e.duration()
217            })
218            .as_secs() as i64;
219        let last = self.last_heartbeat_at.load(Ordering::Relaxed);
220        if now_secs - last < 5 {
221            return Ok(());
222        }
223        self.last_heartbeat_at.store(now_secs, Ordering::Relaxed);
224        match self.persistence.is_run_cancelled(&self.workflow_run_id) {
225            Ok(true) => {
226                tracing::info!(
227                    "Workflow run {} cancelled externally, stopping execution",
228                    self.workflow_run_id
229                );
230                self.cancellation
231                    .cancel(CancellationReason::UserRequested(None));
232                return Err(EngineError::Cancelled(CancellationReason::UserRequested(
233                    None,
234                )));
235            }
236            Ok(false) => {}
237            Err(e) => {
238                tracing::warn!(
239                    "Database error during cancellation check for workflow run {}: {}",
240                    self.workflow_run_id,
241                    e
242                );
243            }
244        }
245        Ok(())
246    }
247
248    /// Project this state into the narrow surface a `ChildWorkflowRunner`
249    /// implementation needs to spawn a child run.
250    pub fn child_workflow_context(&self) -> ChildWorkflowContext {
251        ChildWorkflowContext {
252            run_ctx: Arc::clone(&self.run_ctx),
253            extra_plugin_dirs: self.extra_plugin_dirs.clone(),
254            workflow_run_id: self.workflow_run_id.clone(),
255            model: self.model.clone(),
256            exec_config: self.exec_config.clone(),
257            inputs: self.inputs.clone(),
258            event_sinks: Arc::clone(&self.event_sinks),
259        }
260    }
261
262    /// Fork a child execution state from this parent.
263    ///
264    /// Copies shared configuration (persistence, registries, workflow identity) and resets all
265    /// runtime accumulators so the child starts with a clean slate.
266    pub fn fork_child(&self, cancellation: CancellationToken) -> ExecutionState {
267        let mut child = self.clone();
268        child.inputs.clear();
269        child.step_results.clear();
270        child.contexts.clear();
271        child.position = 0;
272        child.all_succeeded = true;
273        child.total_cost = 0.0;
274        child.total_turns = 0;
275        child.total_duration_ms = 0;
276        child.total_input_tokens = 0;
277        child.total_output_tokens = 0;
278        child.total_cache_read_input_tokens = 0;
279        child.total_cache_creation_input_tokens = 0;
280        child.has_llm_metrics = false;
281        child.last_gate_feedback = None;
282        child.block_output = None;
283        child.block_with.clear();
284        child.resume_ctx = None;
285        child.triggered_by_hook = false;
286        child.last_heartbeat_at = Self::new_heartbeat();
287        child.cancellation = cancellation;
288        child.current_execution_id = Arc::new(std::sync::Mutex::new(None));
289        child.owner_token = None;
290        child.lease_generation = None;
291        child
292    }
293
294    /// Accumulate individual metrics into this execution state.
295    ///
296    /// Returns `true` if at least one metric was present and added.
297    #[allow(clippy::too_many_arguments)]
298    pub fn accumulate_metrics(
299        &mut self,
300        cost: Option<f64>,
301        turns: Option<i64>,
302        duration: Option<i64>,
303        input_tokens: Option<i64>,
304        output_tokens: Option<i64>,
305        cache_read: Option<i64>,
306        cache_create: Option<i64>,
307    ) -> bool {
308        let mut changed = false;
309        if let Some(c) = cost {
310            self.total_cost += c;
311            changed = true;
312        }
313        if let Some(t) = turns {
314            self.total_turns += t;
315            changed = true;
316        }
317        if let Some(d) = duration {
318            self.total_duration_ms += d;
319            changed = true;
320        }
321        if let Some(t) = input_tokens {
322            self.total_input_tokens += t;
323            changed = true;
324        }
325        if let Some(t) = output_tokens {
326            self.total_output_tokens += t;
327            changed = true;
328        }
329        if let Some(t) = cache_read {
330            self.total_cache_read_input_tokens += t;
331            changed = true;
332        }
333        if let Some(t) = cache_create {
334            self.total_cache_creation_input_tokens += t;
335            changed = true;
336        }
337        changed
338    }
339}
340
341/// Resolve a schema by name using the schema_resolver callback.
342pub fn resolve_schema(state: &ExecutionState, name: &str) -> Result<OutputSchema> {
343    match &state.schema_resolver {
344        Some(resolver) => resolver(name),
345        None => Err(EngineError::Workflow(format!(
346            "No schema resolver configured — cannot load schema '{name}'"
347        ))),
348    }
349}
350
351/// Emit an engine event to all registered sinks.
352///
353/// Each sink is called inside `catch_unwind(AssertUnwindSafe(...))`. Panics are
354/// logged via `tracing::warn!` and do not abort the run or skip remaining sinks.
355pub fn emit_event(state: &ExecutionState, event: EngineEvent) {
356    crate::events::emit_to_sinks(&state.workflow_run_id, event, &state.event_sinks);
357}
358
359/// Extract completed step keys from a slice of step records.
360pub fn completed_keys_from_steps(steps: &[WorkflowRunStep]) -> HashSet<StepKey> {
361    steps
362        .iter()
363        .filter(|s| s.status == WorkflowStepStatus::Completed)
364        .map(|s| (s.step_name.clone(), s.iteration as u32))
365        .collect()
366}
367
368/// Validate required workflow inputs are present and apply default values.
369pub fn apply_workflow_input_defaults(
370    workflow: &WorkflowDef,
371    inputs: &mut HashMap<String, String>,
372) -> Result<()> {
373    for input_decl in &workflow.inputs {
374        if input_decl.required && !inputs.contains_key(&input_decl.name) {
375            return Err(EngineError::Workflow(format!(
376                "Missing required input: '{}'. Use --input {}=<value>.",
377                input_decl.name, input_decl.name
378            )));
379        }
380        if let Some(ref default) = input_decl.default {
381            inputs
382                .entry(input_decl.name.clone())
383                .or_insert_with(|| default.clone());
384        }
385        if input_decl.input_type == InputType::Boolean {
386            inputs
387                .entry(input_decl.name.clone())
388                .or_insert_with(|| "false".to_string());
389        }
390    }
391    Ok(())
392}
393
394/// Shared orchestration: execute body → always block → build summary → finalize.
395pub fn run_workflow_engine(
396    state: &mut ExecutionState,
397    workflow: &WorkflowDef,
398) -> Result<WorkflowResult> {
399    // Emit RunStarted or RunResumed (DB write for the run record already happened before this fn).
400    if state.resume_ctx.is_some() {
401        emit_event(
402            state,
403            EngineEvent::RunResumed {
404                workflow_name: workflow.name.clone(),
405            },
406        );
407    } else {
408        emit_event(
409            state,
410            EngineEvent::RunStarted {
411                workflow_name: workflow.name.clone(),
412            },
413        );
414    }
415
416    // Execute main body
417    let mut body_error: Option<String> = None;
418    let body_result = execute_nodes(state, &workflow.body, true);
419    if let Err(ref e) = body_result {
420        let msg = e.to_string();
421        tracing::error!("Body execution error: {msg}");
422        state.all_succeeded = false;
423        body_error = Some(msg);
424        // Mirror LeaseLost onto the cancellation token so FlowEngine::run's
425        // lease_lost_during_run check fires even when the error reached us via
426        // a step-write failure rather than the refresh thread.
427        if matches!(
428            e,
429            EngineError::Cancelled(crate::cancellation_reason::CancellationReason::LeaseLost)
430        ) {
431            state
432                .cancellation
433                .cancel(crate::cancellation_reason::CancellationReason::LeaseLost);
434        }
435    }
436
437    // Execute always block regardless of outcome
438    if !workflow.always.is_empty() {
439        let workflow_status = if state.all_succeeded {
440            "completed"
441        } else {
442            "failed"
443        };
444        state
445            .inputs
446            .insert("workflow_status".to_string(), workflow_status.to_string());
447        // Snapshot all_succeeded so the always block cannot change the terminal status.
448        let saved_all_succeeded = state.all_succeeded;
449        let always_result = execute_nodes(state, &workflow.always, false);
450        state.all_succeeded = saved_all_succeeded;
451        if let Err(ref e) = always_result {
452            tracing::warn!("Always block error (non-fatal): {e}");
453        }
454    }
455
456    // Build summary
457    let mut summary = crate::helpers::build_workflow_summary(state);
458    if let Some(ref err) = body_error {
459        summary.push_str(&format!("\nError: {err}"));
460    }
461
462    // Finalize run status via persistence
463    let wf_run_id = state.workflow_run_id.clone();
464    let is_cancelled = matches!(&body_result, Err(EngineError::Cancelled(_)));
465
466    emit_event(
467        state,
468        EngineEvent::MetricsUpdated {
469            total_cost: state.total_cost,
470            total_turns: state.total_turns,
471            total_duration_ms: state.total_duration_ms,
472        },
473    );
474
475    if state.all_succeeded {
476        state.persistence.update_run_status(
477            &wf_run_id,
478            WorkflowRunStatus::Completed,
479            Some(&summary),
480            None,
481        )?;
482        tracing::info!("Workflow '{}' completed successfully", workflow.name);
483        emit_event(state, EngineEvent::RunCompleted { succeeded: true });
484    } else if is_cancelled {
485        let cancel_reason = state
486            .cancellation
487            .reason()
488            .unwrap_or(crate::cancellation_reason::CancellationReason::UserRequested(None));
489        state.persistence.update_run_status(
490            &wf_run_id,
491            WorkflowRunStatus::Cancelled,
492            Some(&summary),
493            body_error.as_deref(),
494        )?;
495        tracing::warn!("Workflow '{}' was cancelled", workflow.name);
496        emit_event(
497            state,
498            EngineEvent::RunCancelled {
499                reason: cancel_reason,
500            },
501        );
502    } else {
503        state.persistence.update_run_status(
504            &wf_run_id,
505            WorkflowRunStatus::Failed,
506            Some(&summary),
507            body_error.as_deref(),
508        )?;
509        tracing::warn!("Workflow '{}' finished with failures", workflow.name);
510        emit_event(state, EngineEvent::RunCompleted { succeeded: false });
511    }
512
513    tracing::info!(
514        "Total: ${:.4}, {} turns, {:.1}s",
515        state.total_cost,
516        state.total_turns,
517        state.total_duration_ms as f64 / 1000.0
518    );
519
520    let mut result_extensions = Extensions::default();
521    if state.has_llm_metrics {
522        let metrics = LlmRunMetrics {
523            total_input_tokens: (state.total_input_tokens != 0).then_some(state.total_input_tokens),
524            total_output_tokens: (state.total_output_tokens != 0)
525                .then_some(state.total_output_tokens),
526            total_cache_read_input_tokens: (state.total_cache_read_input_tokens != 0)
527                .then_some(state.total_cache_read_input_tokens),
528            total_cache_creation_input_tokens: (state.total_cache_creation_input_tokens != 0)
529                .then_some(state.total_cache_creation_input_tokens),
530            total_turns: (state.total_turns != 0).then_some(state.total_turns),
531            total_cost_usd: (state.total_cost != 0.0).then_some(state.total_cost),
532            model: state.model.clone(),
533        };
534        if metrics.total_input_tokens.is_some()
535            || metrics.total_output_tokens.is_some()
536            || metrics.total_cache_read_input_tokens.is_some()
537            || metrics.total_cache_creation_input_tokens.is_some()
538            || metrics.total_turns.is_some()
539            || metrics.total_cost_usd.is_some()
540            || metrics.model.is_some()
541        {
542            result_extensions.insert(metrics);
543        }
544    }
545
546    Ok(WorkflowResult {
547        workflow_run_id: wf_run_id,
548        workflow_name: workflow.name.clone(),
549        all_succeeded: state.all_succeeded,
550        total_duration_ms: state.total_duration_ms,
551        extensions: result_extensions,
552    })
553}
554
555/// Walk a list of workflow nodes, dispatching to the appropriate handler.
556pub fn execute_single_node(
557    state: &mut ExecutionState,
558    node: &WorkflowNode,
559    iteration: u32,
560) -> Result<()> {
561    match node {
562        WorkflowNode::Call(n) => crate::executors::call::execute_call(state, n, iteration)?,
563        WorkflowNode::CallWorkflow(n) => {
564            crate::executors::call_workflow::execute_call_workflow(state, n, iteration)?
565        }
566        WorkflowNode::If(n) => crate::executors::control_flow::execute_if(state, n)?,
567        WorkflowNode::Unless(n) => crate::executors::control_flow::execute_unless(state, n)?,
568        WorkflowNode::While(n) => crate::executors::control_flow::execute_while(state, n)?,
569        WorkflowNode::DoWhile(n) => crate::executors::control_flow::execute_do_while(state, n)?,
570        WorkflowNode::Do(n) => crate::executors::control_flow::execute_do(state, n)?,
571        WorkflowNode::Parallel(n) => {
572            crate::executors::parallel::execute_parallel(state, n, iteration)?
573        }
574        WorkflowNode::Gate(n) => crate::executors::gate::execute_gate(state, n, iteration)?,
575        WorkflowNode::Script(n) => crate::executors::script::execute_script(state, n, iteration)?,
576        WorkflowNode::ForEach(n) => {
577            crate::executors::foreach::execute_foreach(state, n, iteration)?
578        }
579        WorkflowNode::Always(n) => {
580            // Nested always — just execute body
581            execute_nodes(state, &n.body, false)?;
582        }
583    }
584    Ok(())
585}
586
587pub fn execute_nodes(
588    state: &mut ExecutionState,
589    nodes: &[WorkflowNode],
590    respect_fail_fast: bool,
591) -> Result<()> {
592    for node in nodes {
593        if respect_fail_fast && !state.all_succeeded && state.exec_config.fail_fast {
594            break;
595        }
596        // Cheap in-memory token check first (no I/O).
597        if state.cancellation.is_cancelled() {
598            return state.cancellation.error_if_cancelled();
599        }
600        state.check_cancellation_throttled()?;
601        execute_single_node(state, node, 0)?;
602    }
603    Ok(())
604}
605
606/// Record a failed step result and optionally return a fail-fast error.
607pub fn record_step_failure(
608    state: &mut ExecutionState,
609    step_key: String,
610    step_label: &str,
611    last_error: String,
612    max_attempts: u32,
613    started: bool,
614) -> Result<()> {
615    state.all_succeeded = false;
616    let step_result = StepResult::failed(step_label, last_error);
617    state.step_results.insert(step_key, step_result);
618
619    if state.exec_config.fail_fast {
620        let msg = if started {
621            format!(
622                "Step '{}' failed after {} attempts",
623                step_label, max_attempts
624            )
625        } else {
626            format!("Step '{}' failed to start (never executed)", step_label)
627        };
628        return Err(EngineError::Workflow(msg));
629    }
630
631    Ok(())
632}
633
634/// Record a skipped step (on_fail = continue): insert StepResult with Skipped status.
635pub fn record_step_skipped(state: &mut ExecutionState, step_key: String, step_label: &str) {
636    tracing::info!("Step '{}' skipped via on_fail = continue", step_label);
637    let step_result = StepResult::skipped(step_label);
638    state.step_results.insert(step_key, step_result);
639}
640
641/// Parse a metric value from a metadata map, logging a warning on parse failure.
642fn parse_metric_f64(map: &std::collections::HashMap<String, String>, key: &str) -> Option<f64> {
643    map.get(key).and_then(|v| {
644        v.parse::<f64>()
645            .map_err(|e| tracing::warn!("metadata key '{key}' has non-numeric value '{v}': {e}"))
646            .ok()
647    })
648}
649
650/// Parse an integer metric value from a metadata map, logging a warning on parse failure.
651fn parse_metric_i64(map: &std::collections::HashMap<String, String>, key: &str) -> Option<i64> {
652    map.get(key).and_then(|v| {
653        v.parse::<i64>()
654            .map_err(|e| tracing::warn!("metadata key '{key}' has non-integer value '{v}': {e}"))
655            .ok()
656    })
657}
658
659/// Record a successful step: accumulate stats, insert StepResult, push context.
660pub fn record_step_success(
661    state: &mut ExecutionState,
662    step_key: String,
663    success: crate::types::StepSuccess,
664) {
665    use crate::constants::metadata_keys;
666    let cost_usd = parse_metric_f64(&success.metadata, metadata_keys::COST_USD);
667    let num_turns = parse_metric_i64(&success.metadata, metadata_keys::NUM_TURNS);
668    let duration_ms = parse_metric_i64(&success.metadata, metadata_keys::DURATION_MS);
669    let input_tokens = parse_metric_i64(&success.metadata, metadata_keys::INPUT_TOKENS);
670    let output_tokens = parse_metric_i64(&success.metadata, metadata_keys::OUTPUT_TOKENS);
671    let cache_read = parse_metric_i64(&success.metadata, metadata_keys::CACHE_READ_INPUT_TOKENS);
672    let cache_creation = parse_metric_i64(
673        &success.metadata,
674        metadata_keys::CACHE_CREATION_INPUT_TOKENS,
675    );
676    if state.accumulate_metrics(
677        cost_usd,
678        num_turns,
679        duration_ms,
680        input_tokens,
681        output_tokens,
682        cache_read,
683        cache_creation,
684    ) {
685        state.has_llm_metrics = true;
686    }
687
688    let step_result = StepResult::completed(&success);
689    state.step_results.insert(step_key, step_result);
690
691    state.contexts.push(success.into());
692}
693
694/// Resolve child workflow inputs: substitute variables, apply defaults, and
695/// check for missing required inputs.
696pub fn resolve_child_inputs(
697    raw_inputs: &HashMap<String, String>,
698    vars: &HashMap<String, String>,
699    input_decls: &[crate::dsl::InputDecl],
700) -> std::result::Result<HashMap<String, String>, String> {
701    let mut child_inputs = HashMap::new();
702    for (k, v) in raw_inputs {
703        child_inputs.insert(
704            k.clone(),
705            crate::prompt_builder::substitute_variables_keep_literal(v, vars),
706        );
707    }
708    for decl in input_decls {
709        if !child_inputs.contains_key(&decl.name) {
710            if decl.required {
711                return Err(decl.name.clone());
712            }
713            if let Some(ref default) = decl.default {
714                child_inputs.insert(decl.name.clone(), default.clone());
715            }
716            if decl.input_type == crate::dsl::InputType::Boolean {
717                child_inputs
718                    .entry(decl.name.clone())
719                    .or_insert_with(|| "false".to_string());
720            }
721        }
722    }
723    Ok(child_inputs)
724}
725
726/// Run the on_fail agent after all retries for a step are exhausted.
727pub fn run_on_fail_agent(
728    state: &mut ExecutionState,
729    step_label: &str,
730    on_fail_agent: &crate::dsl::AgentRef,
731    last_error: &str,
732    retries: u32,
733    iteration: u32,
734) {
735    tracing::warn!(
736        "All retries exhausted for '{}', running on_fail agent '{}'",
737        step_label,
738        on_fail_agent.label(),
739    );
740    state
741        .inputs
742        .insert("failed_step".to_string(), step_label.to_string());
743    state
744        .inputs
745        .insert("failure_reason".to_string(), last_error.to_string());
746    state
747        .inputs
748        .insert("retry_count".to_string(), retries.to_string());
749
750    let on_fail_node = crate::dsl::CallNode {
751        agent: on_fail_agent.clone(),
752        retries: 0,
753        on_fail: None,
754        output: None,
755        with: Vec::new(),
756        as_identity: None,
757        plugin_dirs: Vec::new(),
758        timeout: None,
759        max_turns: None,
760    };
761    if let Err(e) = crate::executors::call::execute_call(state, &on_fail_node, iteration) {
762        tracing::warn!("on_fail agent '{}' also failed: {e}", on_fail_agent.label(),);
763    }
764
765    state.inputs.remove("failed_step");
766    state.inputs.remove("failure_reason");
767    state.inputs.remove("retry_count");
768}
769
770/// Dispatch `on_fail` after all retries are exhausted, then record the failure.
771#[allow(clippy::too_many_arguments)]
772pub fn handle_on_fail(
773    state: &mut ExecutionState,
774    step_key: String,
775    step_label: &str,
776    on_fail: &Option<OnFail>,
777    last_error: String,
778    retries: u32,
779    iteration: u32,
780    max_attempts: u32,
781) -> Result<()> {
782    match on_fail {
783        Some(OnFail::Continue) => {
784            record_step_skipped(state, step_key, step_label);
785            return Ok(());
786        }
787        Some(OnFail::Agent(ref on_fail_agent)) => {
788            run_on_fail_agent(
789                state,
790                step_label,
791                on_fail_agent,
792                &last_error,
793                retries,
794                iteration,
795            );
796        }
797        None => {}
798    }
799    record_step_failure(state, step_key, step_label, last_error, max_attempts, true)
800}
801
802/// Check whether a step should be skipped on resume.
803pub fn should_skip(state: &ExecutionState, step_name: &str, iteration: u32) -> bool {
804    state.resume_ctx.as_ref().is_some_and(|ctx| {
805        ctx.step_map
806            .get(step_name)
807            .is_some_and(|m| m.contains_key(&iteration))
808    })
809}
810
811/// Deserialize a `markers_out` JSON string into a `Vec<String>`, logging on error.
812fn parse_markers_out(markers_json: Option<&str>, step_name: &str) -> Vec<String> {
813    markers_json
814        .and_then(|m| {
815            serde_json::from_str(m)
816                .map_err(|e| {
817                    tracing::warn!("Malformed markers_out JSON in step '{step_name}': {e}")
818                })
819                .ok()
820        })
821        .unwrap_or_default()
822}
823
824/// Temporarily take the `ResumeContext` out of `state` so we can borrow `state`
825/// mutably while reading from the context's maps.
826pub fn restore_step(state: &mut ExecutionState, key: &str, iteration: u32) {
827    let ctx = state.resume_ctx.take();
828    if let Some(ref ctx) = ctx {
829        restore_completed_step(state, ctx, key, iteration);
830    }
831    state.resume_ctx = ctx;
832}
833
834/// Restore a completed step's results from the resume context into the execution state.
835pub fn restore_completed_step(
836    state: &mut ExecutionState,
837    ctx: &ResumeContext,
838    step_key: &str,
839    iteration: u32,
840) {
841    let completed_step = ctx.step_map.get(step_key).and_then(|m| m.get(&iteration));
842
843    let Some(step) = completed_step else {
844        tracing::warn!(
845            "resume: step '{step_key}:{iteration}' in skip set but not found in resume context \
846             — downstream variable substitution may be incorrect"
847        );
848        return;
849    };
850
851    let markers = parse_markers_out(step.markers_out.as_deref(), step_key);
852    let context = step.context_out.clone().unwrap_or_default();
853
854    // Restore gate feedback if this was a gate step
855    if let Some(ref feedback) = step.gate_feedback {
856        state.last_gate_feedback = Some(feedback.clone());
857    }
858
859    let success = crate::types::StepSuccess::from_workflow_run_step(
860        step_key.to_string(),
861        step,
862        markers,
863        context,
864        iteration,
865    );
866    let step_result = StepResult::completed_without_metrics(&success);
867    state.step_results.insert(step_key.to_string(), step_result);
868
869    state.contexts.push(success.into());
870}
871
872/// Returned by [`fetch_child_completion_data`].
873/// `0` — final-step `(markers, context)` of the child workflow.
874/// `1` — bubble-up step-results map keyed by child step name (used by parent
875/// for `if step.marker` checks).
876/// `2` — child step contexts in chronological order (pushed into parent's
877/// `state.contexts` so downstream agents see child step outputs in `prior_contexts`).
878pub type ChildCompletionData = (
879    (Vec<String>, String),
880    HashMap<String, StepResult>,
881    Vec<ContextEntry>,
882);
883
884/// Fetch the final step output, the bubble-up step-results map, AND the
885/// child workflow's context entries — in a single DB query. The context
886/// entries are pushed into the parent's `state.contexts` so that downstream
887/// agents in the parent workflow see child-workflow step outputs via
888/// `prior_contexts` in their prompt templates. Without this third return
889/// value, only `state.step_results` carries child step data and parent-side
890/// agents have no access to child step `context_out` / `structured_output`.
891pub fn fetch_child_completion_data(
892    persistence: &dyn WorkflowPersistence,
893    workflow_run_id: &str,
894) -> ChildCompletionData {
895    let steps = match persistence.get_steps(workflow_run_id) {
896        Ok(s) => s,
897        Err(e) => {
898            tracing::warn!(
899                "Failed to fetch steps for child workflow run '{}': {e}",
900                workflow_run_id,
901            );
902            return ((Vec::new(), String::new()), HashMap::new(), Vec::new());
903        }
904    };
905
906    // Collect completed steps once, ordered by position so context bubble-up
907    // preserves chronological order in the parent's `state.contexts`.
908    let mut completed: Vec<_> = steps
909        .into_iter()
910        .filter(|s| s.status == WorkflowStepStatus::Completed)
911        .collect();
912    completed.sort_by_key(|s| s.position);
913
914    let final_output = match completed.iter().max_by_key(|s| s.position) {
915        Some(step) => {
916            let markers = parse_markers_out(step.markers_out.as_deref(), &step.step_name);
917            let context = step.context_out.clone().unwrap_or_default();
918            (markers, context)
919        }
920        None => (Vec::new(), String::new()),
921    };
922
923    // Build the bubble-up step-results map AND the contexts list from the
924    // same StepSuccess values. The map is keyed by step name (used for
925    // `if step.marker` checks); the list preserves chronological order
926    // (used as `prior_contexts` in agent prompts).
927    let mut child_steps = HashMap::with_capacity(completed.len());
928    let mut child_contexts = Vec::with_capacity(completed.len());
929    for s in completed {
930        let markers = parse_markers_out(s.markers_out.as_deref(), &s.step_name);
931        let context = s.context_out.clone().unwrap_or_default();
932        let success = crate::types::StepSuccess::from_workflow_run_step(
933            s.step_name.clone(),
934            &s,
935            markers,
936            context,
937            0,
938        );
939        let result = StepResult::completed_without_metrics(&success);
940        let entry: ContextEntry = success.into();
941        child_steps.insert(s.step_name, result);
942        child_contexts.push(entry);
943    }
944
945    (final_output, child_steps, child_contexts)
946}
947
948/// Check whether the loop is stuck (identical marker sets for `stuck_after` consecutive
949/// iterations). Returns `Err` if stuck, `Ok(())` otherwise.
950pub fn check_stuck(
951    state: &mut ExecutionState,
952    prev_marker_sets: &mut VecDeque<HashSet<String>>,
953    step: &str,
954    marker: &str,
955    stuck_after: u32,
956    loop_kind: &str,
957) -> Result<()> {
958    let current_markers: HashSet<String> = state
959        .step_results
960        .get(step)
961        .map(|r| r.markers.iter().cloned().collect())
962        .unwrap_or_default();
963
964    prev_marker_sets.push_back(current_markers.clone());
965    if prev_marker_sets.len() > stuck_after as usize {
966        prev_marker_sets.pop_front();
967    }
968
969    if prev_marker_sets.len() >= stuck_after as usize
970        && prev_marker_sets.iter().all(|s| s == &current_markers)
971    {
972        tracing::warn!(
973            "{loop_kind} {step}.{marker} — stuck: identical markers for {stuck_after} consecutive iterations",
974        );
975        state.all_succeeded = false;
976        return Err(EngineError::Workflow(format!(
977            "{loop_kind} {step}.{marker} stuck after {stuck_after} iterations with identical markers",
978        )));
979    }
980
981    Ok(())
982}
983
984/// Check whether the loop has exceeded `max_iterations`.
985pub fn check_max_iterations(
986    state: &mut ExecutionState,
987    iteration: u32,
988    max_iterations: u32,
989    on_max_iter: &crate::dsl::OnMaxIter,
990    step: &str,
991    marker: &str,
992    loop_kind: &str,
993) -> Result<bool> {
994    if iteration >= max_iterations {
995        tracing::warn!("{loop_kind} {step}.{marker} — reached max_iterations ({max_iterations})",);
996        match on_max_iter {
997            crate::dsl::OnMaxIter::Fail => {
998                state.all_succeeded = false;
999                return Err(EngineError::Workflow(format!(
1000                    "{loop_kind} {step}.{marker} reached max_iterations ({max_iterations})",
1001                )));
1002            }
1003            crate::dsl::OnMaxIter::Continue => return Ok(true),
1004        }
1005    }
1006    Ok(false)
1007}
1008
1009/// Build the variable map from execution state for substitution.
1010pub fn build_variable_map(state: &ExecutionState) -> HashMap<String, String> {
1011    crate::prompt_builder::build_variable_map(state)
1012}
1013
1014/// Generate the FLOW_OUTPUT instruction (used when no schema is set).
1015pub fn flow_output_instruction() -> &'static str {
1016    FLOW_OUTPUT_INSTRUCTION
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021    use super::*;
1022    use crate::dsl::{InputDecl, InputType, WorkflowDef, WorkflowTrigger};
1023
1024    fn make_bool_workflow(
1025        name: &str,
1026        input_name: &str,
1027        required: bool,
1028        default: Option<&str>,
1029    ) -> WorkflowDef {
1030        WorkflowDef {
1031            name: name.to_string(),
1032            title: None,
1033            description: String::new(),
1034            trigger: WorkflowTrigger::Manual,
1035            targets: vec![],
1036            group: None,
1037            inputs: vec![InputDecl {
1038                name: input_name.to_string(),
1039                input_type: InputType::Boolean,
1040                required,
1041                default: default.map(|s| s.to_string()),
1042                description: None,
1043            }],
1044            body: vec![],
1045            always: vec![],
1046            source_path: String::new(),
1047        }
1048    }
1049
1050    #[test]
1051    fn test_boolean_input_defaults_to_false_when_absent() {
1052        let workflow = make_bool_workflow("wf", "flag", false, None);
1053        let mut inputs = HashMap::new();
1054        apply_workflow_input_defaults(&workflow, &mut inputs).unwrap();
1055        assert_eq!(inputs.get("flag").map(|s| s.as_str()), Some("false"));
1056    }
1057
1058    #[test]
1059    fn test_boolean_input_uses_explicit_default_over_false() {
1060        let workflow = make_bool_workflow("wf", "flag", false, Some("true"));
1061        let mut inputs = HashMap::new();
1062        apply_workflow_input_defaults(&workflow, &mut inputs).unwrap();
1063        assert_eq!(inputs.get("flag").map(|s| s.as_str()), Some("true"));
1064    }
1065
1066    #[test]
1067    fn test_boolean_input_caller_value_not_overwritten() {
1068        let workflow = make_bool_workflow("wf", "flag", false, None);
1069        let mut inputs = HashMap::new();
1070        inputs.insert("flag".to_string(), "true".to_string());
1071        apply_workflow_input_defaults(&workflow, &mut inputs).unwrap();
1072        assert_eq!(inputs.get("flag").map(|s| s.as_str()), Some("true"));
1073    }
1074
1075    #[test]
1076    fn test_boolean_input_required_and_missing_is_error() {
1077        let workflow = make_bool_workflow("wf", "flag", true, None);
1078        let mut inputs = HashMap::new();
1079        let result = apply_workflow_input_defaults(&workflow, &mut inputs);
1080        assert!(result.is_err(), "expected error for missing required input");
1081    }
1082
1083    #[test]
1084    fn fork_child_resets_runtime_state_and_preserves_shared_config() {
1085        use crate::cancellation::CancellationToken;
1086        use crate::persistence_memory::InMemoryWorkflowPersistence;
1087        use crate::traits::script_env_provider::NoOpScriptEnvProvider;
1088        use crate::types::WorkflowExecConfig;
1089
1090        struct DummyChildRunner;
1091        impl ChildWorkflowRunner for DummyChildRunner {
1092            fn execute_child(
1093                &self,
1094                _workflow_name: &str,
1095                _parent_ctx: &ChildWorkflowContext,
1096                _params: ChildWorkflowInput,
1097            ) -> Result<crate::types::WorkflowResult> {
1098                unimplemented!()
1099            }
1100            fn resume_child(
1101                &self,
1102                _workflow_run_id: &str,
1103                _model: Option<&str>,
1104                _parent_ctx: &ChildWorkflowContext,
1105            ) -> Result<crate::types::WorkflowResult> {
1106                unimplemented!()
1107            }
1108            fn find_resumable_child(
1109                &self,
1110                _parent_run_id: &str,
1111                _workflow_name: &str,
1112            ) -> Result<Option<crate::types::WorkflowRun>> {
1113                unimplemented!()
1114            }
1115        }
1116
1117        let parent = ExecutionState {
1118            persistence: Arc::new(InMemoryWorkflowPersistence::new()),
1119            action_registry: Arc::new(crate::traits::action_executor::ActionRegistry::new(
1120                HashMap::new(),
1121                None,
1122            )),
1123            script_env_provider: Arc::new(NoOpScriptEnvProvider),
1124            workflow_run_id: "run-1".to_string(),
1125            workflow_name: "wf".to_string(),
1126            run_ctx: {
1127                let mut vars = std::collections::HashMap::new();
1128                vars.insert("worktree_id", "wt".to_string());
1129                vars.insert("repo_path", "/repo".to_string());
1130                vars.insert("ticket_id", "TICK-1".to_string());
1131                vars.insert("repo_id", "repo-1".to_string());
1132                Arc::new(
1133                    crate::traits::run_context::NoopRunContext::with_vars(vars)
1134                        .with_working_dir("/tmp"),
1135                ) as Arc<dyn RunContext>
1136            },
1137            extra_plugin_dirs: vec!["plugins".to_string()],
1138            model: Some("gpt-4".to_string()),
1139            exec_config: WorkflowExecConfig::default(),
1140            inputs: {
1141                let mut m = HashMap::new();
1142                m.insert("key".to_string(), "val".to_string());
1143                m
1144            },
1145            parent_run_id: "parent-1".to_string(),
1146            depth: 3,
1147            target_label: Some("label".to_string()),
1148            step_results: {
1149                let mut m = HashMap::new();
1150                m.insert("step".to_string(), StepResult::default());
1151                m
1152            },
1153            contexts: vec![ContextEntry {
1154                step: "step".to_string(),
1155                iteration: 1,
1156                context: "ctx".to_string(),
1157                markers: vec![],
1158                structured_output: None,
1159                output_file: None,
1160            }],
1161            position: 42,
1162            all_succeeded: false,
1163            total_cost: 1.23,
1164            total_turns: 5,
1165            total_duration_ms: 1000,
1166            total_input_tokens: 100,
1167            total_output_tokens: 200,
1168            total_cache_read_input_tokens: 50,
1169            total_cache_creation_input_tokens: 25,
1170            has_llm_metrics: false,
1171            last_gate_feedback: Some("feedback".to_string()),
1172            block_output: Some("output".to_string()),
1173            block_with: vec!["with".to_string()],
1174            resume_ctx: None,
1175            default_as_identity: Some("bot".to_string()),
1176            triggered_by_hook: true,
1177            schema_resolver: None,
1178            child_runner: Some(Arc::new(DummyChildRunner)),
1179            last_heartbeat_at: ExecutionState::new_heartbeat(),
1180            registry: Arc::new(crate::traits::item_provider::ItemProviderRegistry::new()),
1181            event_sinks: Arc::from(vec![]),
1182            cancellation: CancellationToken::new(),
1183            current_execution_id: Arc::new(std::sync::Mutex::new(None)),
1184            owner_token: None,
1185            lease_generation: None,
1186        };
1187
1188        let child_cancellation = CancellationToken::new();
1189        let child = parent.fork_child(child_cancellation.clone());
1190
1191        // Shared config cloned
1192        assert_eq!(child.workflow_run_id, "run-1");
1193        assert_eq!(child.workflow_name, "wf");
1194        assert_eq!(child.run_ctx.working_dir_str(), "/tmp");
1195        assert_eq!(child.model, Some("gpt-4".to_string()));
1196        assert_eq!(child.depth, 3);
1197        assert_eq!(child.target_label, Some("label".to_string()));
1198        assert_eq!(child.default_as_identity, Some("bot".to_string()));
1199        assert_eq!(child.parent_run_id, "parent-1");
1200
1201        // Runtime state reset
1202        assert!(child.inputs.is_empty(), "inputs should be cleared");
1203        assert!(
1204            child.step_results.is_empty(),
1205            "step_results should be cleared"
1206        );
1207        assert!(child.contexts.is_empty(), "contexts should be cleared");
1208        assert_eq!(child.position, 0);
1209        assert!(child.all_succeeded);
1210        assert_eq!(child.total_cost, 0.0);
1211        assert_eq!(child.total_turns, 0);
1212        assert_eq!(child.total_duration_ms, 0);
1213        assert_eq!(child.total_input_tokens, 0);
1214        assert_eq!(child.total_output_tokens, 0);
1215        assert_eq!(child.total_cache_read_input_tokens, 0);
1216        assert_eq!(child.total_cache_creation_input_tokens, 0);
1217        assert!(
1218            !child.has_llm_metrics,
1219            "has_llm_metrics should be reset in fork_child"
1220        );
1221        assert!(child.last_gate_feedback.is_none());
1222        assert!(child.block_output.is_none());
1223        assert!(child.block_with.is_empty());
1224        assert!(child.resume_ctx.is_none());
1225        assert!(!child.triggered_by_hook);
1226        assert!(child.schema_resolver.is_none());
1227        assert!(
1228            child.child_runner.is_some(),
1229            "child_runner should be cloned from parent"
1230        );
1231
1232        // Cancellation replaced
1233        assert!(!child.cancellation.is_cancelled());
1234        assert!(std::sync::Arc::ptr_eq(
1235            &child.current_execution_id,
1236            &child.current_execution_id
1237        ));
1238    }
1239
1240    #[test]
1241    fn child_workflow_context_projects_fields() {
1242        use crate::cancellation::CancellationToken;
1243        use crate::events::{EngineEventData, EventSink};
1244        use crate::persistence_memory::InMemoryWorkflowPersistence;
1245        use crate::traits::script_env_provider::NoOpScriptEnvProvider;
1246        use crate::types::WorkflowExecConfig;
1247
1248        struct TestSink;
1249        impl EventSink for TestSink {
1250            fn emit(&self, _: &EngineEventData) {}
1251        }
1252
1253        let sinks: Arc<[Arc<dyn EventSink>]> = Arc::from(vec![
1254            Arc::new(TestSink) as Arc<dyn EventSink>,
1255            Arc::new(TestSink) as Arc<dyn EventSink>,
1256        ]);
1257
1258        let mut state_inputs = HashMap::new();
1259        state_inputs.insert("ticket_id".to_string(), "TICK-42".to_string());
1260        state_inputs.insert("repo_id".to_string(), "repo-7".to_string());
1261
1262        // Distinguishable by some non-default field; event_sinks below is the primary check.
1263        let exec_config = WorkflowExecConfig {
1264            dry_run: true,
1265            ..WorkflowExecConfig::default()
1266        };
1267
1268        let parent = ExecutionState {
1269            persistence: Arc::new(InMemoryWorkflowPersistence::new()),
1270            action_registry: Arc::new(crate::traits::action_executor::ActionRegistry::new(
1271                HashMap::new(),
1272                None,
1273            )),
1274            script_env_provider: Arc::new(NoOpScriptEnvProvider),
1275            workflow_run_id: "run-projection-test".to_string(),
1276            workflow_name: "wf-projection".to_string(),
1277            run_ctx: {
1278                let mut vars = std::collections::HashMap::new();
1279                vars.insert("worktree_id", "wt-9".to_string());
1280                vars.insert("repo_path", "/repo/proj".to_string());
1281                vars.insert("ticket_id", "TICK-42".to_string());
1282                vars.insert("repo_id", "repo-7".to_string());
1283                Arc::new(
1284                    crate::traits::run_context::NoopRunContext::with_vars(vars)
1285                        .with_working_dir("/tmp/proj"),
1286                ) as Arc<dyn RunContext>
1287            },
1288            extra_plugin_dirs: vec!["plugin-a".to_string()],
1289            model: Some("opus".to_string()),
1290            exec_config: exec_config.clone(),
1291            inputs: state_inputs.clone(),
1292            parent_run_id: "parent-7".to_string(),
1293            depth: 2,
1294            target_label: Some("proj-label".to_string()),
1295            step_results: HashMap::new(),
1296            contexts: vec![],
1297            position: 11,
1298            all_succeeded: false,
1299            total_cost: 0.0,
1300            total_turns: 0,
1301            total_duration_ms: 0,
1302            total_input_tokens: 0,
1303            total_output_tokens: 0,
1304            total_cache_read_input_tokens: 0,
1305            total_cache_creation_input_tokens: 0,
1306            has_llm_metrics: false,
1307            last_gate_feedback: None,
1308            block_output: None,
1309            block_with: vec![],
1310            resume_ctx: None,
1311            default_as_identity: None,
1312            triggered_by_hook: true,
1313            schema_resolver: None,
1314            child_runner: None,
1315            last_heartbeat_at: ExecutionState::new_heartbeat(),
1316            registry: Arc::new(crate::traits::item_provider::ItemProviderRegistry::new()),
1317            event_sinks: Arc::clone(&sinks),
1318            cancellation: CancellationToken::new(),
1319            current_execution_id: Arc::new(std::sync::Mutex::new(None)),
1320            owner_token: None,
1321            lease_generation: None,
1322        };
1323
1324        let ctx = parent.child_workflow_context();
1325
1326        // All fields project verbatim.
1327        assert_eq!(ctx.run_ctx.get("worktree_id").as_deref(), Some("wt-9"));
1328        assert_eq!(ctx.run_ctx.working_dir_str(), "/tmp/proj");
1329        assert_eq!(ctx.run_ctx.get("repo_path").as_deref(), Some("/repo/proj"));
1330        assert_eq!(ctx.run_ctx.get("ticket_id").as_deref(), Some("TICK-42"));
1331        assert_eq!(ctx.run_ctx.get("repo_id").as_deref(), Some("repo-7"));
1332        assert_eq!(ctx.extra_plugin_dirs, vec!["plugin-a"]);
1333        assert_eq!(ctx.workflow_run_id, "run-projection-test");
1334        assert_eq!(ctx.model.as_deref(), Some("opus"));
1335        assert!(ctx.exec_config.dry_run);
1336        assert_eq!(ctx.inputs, state_inputs);
1337
1338        // event_sinks slice is shared, not deep-copied.
1339        assert_eq!(ctx.event_sinks.len(), 2);
1340        assert!(
1341            Arc::ptr_eq(&ctx.event_sinks, &sinks),
1342            "event_sinks slice should be shared via Arc, not cloned"
1343        );
1344    }
1345
1346    use crate::test_helpers::CountingPersistence;
1347
1348    /// Build a minimal ExecutionState wired to a CountingPersistence.
1349    fn make_state_with_counting_persistence(
1350        cp: std::sync::Arc<CountingPersistence>,
1351        run_id: String,
1352    ) -> ExecutionState {
1353        crate::test_helpers::make_test_execution_state(
1354            cp as Arc<dyn crate::traits::persistence::WorkflowPersistence>,
1355            run_id,
1356        )
1357    }
1358
1359    /// When persistence reports the run cancelled, the helper sets
1360    /// `state.cancellation` and returns `Err(Cancelled)`.
1361    #[test]
1362    fn check_cancellation_throttled_propagates_external_cancel() {
1363        let cp = Arc::new(CountingPersistence::new());
1364        cp.set_cancelled(true);
1365        let state = make_state_with_counting_persistence(Arc::clone(&cp), "run-1".into());
1366
1367        assert!(!state.cancellation.is_cancelled());
1368        let result = state.check_cancellation_throttled();
1369        assert!(
1370            matches!(result, Err(EngineError::Cancelled(_))),
1371            "expected Err(Cancelled), got {result:?}"
1372        );
1373        assert!(
1374            state.cancellation.is_cancelled(),
1375            "helper must set state.cancellation on external cancel"
1376        );
1377    }
1378
1379    #[test]
1380    fn check_stuck_bounds_buffer() {
1381        use crate::persistence_memory::InMemoryWorkflowPersistence;
1382        use crate::types::StepResult;
1383
1384        let mut state = crate::test_helpers::make_test_execution_state(
1385            Arc::new(InMemoryWorkflowPersistence::new()),
1386            "run-bounds".into(),
1387        );
1388
1389        let stuck_after = 3u32;
1390        let mut prev_marker_sets: VecDeque<HashSet<String>> = VecDeque::new();
1391
1392        for i in 0u32..10 {
1393            let result = StepResult {
1394                markers: vec![format!("marker-{i}")],
1395                ..Default::default()
1396            };
1397            state.step_results.insert("step".to_string(), result);
1398
1399            let res = check_stuck(
1400                &mut state,
1401                &mut prev_marker_sets,
1402                "step",
1403                "m",
1404                stuck_after,
1405                "while",
1406            );
1407            assert!(
1408                res.is_ok(),
1409                "should not be stuck with changing markers at iteration {i}"
1410            );
1411            assert!(
1412                prev_marker_sets.len() <= stuck_after as usize,
1413                "buffer exceeded stuck_after at iteration {i}: len={}",
1414                prev_marker_sets.len()
1415            );
1416        }
1417    }
1418
1419    #[test]
1420    fn check_stuck_detects_stuck() {
1421        use crate::persistence_memory::InMemoryWorkflowPersistence;
1422        use crate::types::StepResult;
1423
1424        let mut state = crate::test_helpers::make_test_execution_state(
1425            Arc::new(InMemoryWorkflowPersistence::new()),
1426            "run-stuck".into(),
1427        );
1428
1429        let stuck_after = 3u32;
1430        let mut prev_marker_sets: VecDeque<HashSet<String>> = VecDeque::new();
1431
1432        let step = StepResult {
1433            markers: vec!["same-marker".to_string()],
1434            ..Default::default()
1435        };
1436        state.step_results.insert("step".to_string(), step);
1437
1438        for i in 0u32..stuck_after {
1439            let res = check_stuck(
1440                &mut state,
1441                &mut prev_marker_sets,
1442                "step",
1443                "m",
1444                stuck_after,
1445                "while",
1446            );
1447            if i + 1 < stuck_after {
1448                assert!(res.is_ok(), "should not be stuck yet at iteration {i}");
1449            } else {
1450                assert!(res.is_err(), "should detect stuck at iteration {i}");
1451            }
1452        }
1453    }
1454
1455    #[test]
1456    fn fetch_child_completion_data_bubbles_contexts_in_position_order() {
1457        use crate::persistence_memory::InMemoryWorkflowPersistence;
1458        use crate::traits::persistence::{NewStep, StepUpdate};
1459
1460        let p = InMemoryWorkflowPersistence::new();
1461        let child_run = "01CHILDRUNID0000000000000";
1462        p.seed_run(child_run);
1463
1464        // Insert two steps in reverse position order to verify the function
1465        // sorts before bubbling — without sorting, contexts would land in
1466        // insertion order, which doesn't match the chronological order
1467        // parent-side agents expect.
1468        let step_b = p
1469            .insert_step(NewStep {
1470                workflow_run_id: child_run.to_string(),
1471                step_name: "step-b".to_string(),
1472                role: "actor".to_string(),
1473                can_commit: false,
1474                position: 2,
1475                iteration: 0,
1476                retry_count: Some(0),
1477            })
1478            .unwrap();
1479        let step_a = p
1480            .insert_step(NewStep {
1481                workflow_run_id: child_run.to_string(),
1482                step_name: "step-a".to_string(),
1483                role: "actor".to_string(),
1484                can_commit: false,
1485                position: 1,
1486                iteration: 0,
1487                retry_count: Some(0),
1488            })
1489            .unwrap();
1490
1491        p.update_step(
1492            &step_a,
1493            StepUpdate::completed(
1494                0,
1495                None,
1496                Some("a-result".into()),
1497                Some("context-from-a".into()),
1498                Some(r#"["m-a"]"#.into()),
1499                0,
1500                Some(r#"{"k":"a"}"#.into()),
1501            ),
1502        )
1503        .unwrap();
1504        p.update_step(
1505            &step_b,
1506            StepUpdate::completed(
1507                0,
1508                None,
1509                Some("b-result".into()),
1510                Some("context-from-b".into()),
1511                Some(r#"["m-b"]"#.into()),
1512                0,
1513                Some(r#"{"k":"b"}"#.into()),
1514            ),
1515        )
1516        .unwrap();
1517
1518        let ((final_markers, final_context), step_results, child_contexts) =
1519            fetch_child_completion_data(&p, child_run);
1520
1521        // Final output is the highest-position step (step-b at position 2).
1522        assert_eq!(final_markers, vec!["m-b".to_string()]);
1523        assert_eq!(final_context, "context-from-b");
1524
1525        // Both steps reachable by name in the bubble-up step_results map.
1526        assert!(step_results.contains_key("step-a"));
1527        assert!(step_results.contains_key("step-b"));
1528
1529        // Child contexts bubbled in position order regardless of insert order,
1530        // and carry the per-step context_out + structured_output so parent
1531        // agents downstream of call_workflow can read them via prior_contexts.
1532        assert_eq!(child_contexts.len(), 2);
1533        assert_eq!(child_contexts[0].step, "step-a");
1534        assert_eq!(child_contexts[0].context, "context-from-a");
1535        assert_eq!(child_contexts[0].markers, vec!["m-a".to_string()]);
1536        assert_eq!(
1537            child_contexts[0].structured_output.as_deref(),
1538            Some(r#"{"k":"a"}"#)
1539        );
1540        assert_eq!(child_contexts[1].step, "step-b");
1541        assert_eq!(child_contexts[1].context, "context-from-b");
1542        assert_eq!(child_contexts[1].markers, vec!["m-b".to_string()]);
1543        assert_eq!(
1544            child_contexts[1].structured_output.as_deref(),
1545            Some(r#"{"k":"b"}"#)
1546        );
1547    }
1548
1549    #[test]
1550    fn fetch_child_completion_data_returns_empty_contexts_on_persistence_error() {
1551        use crate::persistence_memory::InMemoryWorkflowPersistence;
1552
1553        let p = InMemoryWorkflowPersistence::new();
1554        p.set_fail_get_steps(true);
1555
1556        let ((markers, context), step_results, child_contexts) =
1557            fetch_child_completion_data(&p, "any-run-id");
1558
1559        assert!(markers.is_empty());
1560        assert!(context.is_empty());
1561        assert!(step_results.is_empty());
1562        assert!(child_contexts.is_empty());
1563    }
1564
1565    #[test]
1566    fn child_workflow_context_new_sets_required_fields_and_zeros_optional() {
1567        use crate::traits::run_context::NoopRunContext;
1568        use crate::types::WorkflowExecConfig;
1569
1570        let run_ctx = Arc::new(NoopRunContext::default()) as Arc<dyn RunContext>;
1571        let ctx = ChildWorkflowContext::new(
1572            Arc::clone(&run_ctx),
1573            vec!["plugins".to_string()],
1574            "run-42".to_string(),
1575            Some("gpt-4".to_string()),
1576            WorkflowExecConfig::default(),
1577            HashMap::new(),
1578            Arc::from(vec![]),
1579        );
1580
1581        assert_eq!(ctx.workflow_run_id, "run-42");
1582        assert_eq!(ctx.extra_plugin_dirs, vec!["plugins"]);
1583        assert_eq!(ctx.model.as_deref(), Some("gpt-4"));
1584        assert!(ctx.inputs.is_empty());
1585        assert_eq!(ctx.event_sinks.len(), 0);
1586    }
1587}