1use std::collections::{HashMap, HashSet, VecDeque};
2use std::sync::atomic::{AtomicI64, Ordering};
3use std::sync::{Arc, Mutex};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use crate::cancellation::CancellationToken;
7use crate::constants::FLOW_OUTPUT_INSTRUCTION;
8use crate::dsl::{InputType, OnFail, WorkflowDef, WorkflowNode};
9use crate::engine_error::{EngineError, Result};
10use crate::events::{EngineEvent, EventSink};
11use crate::extensions::{Extensions, LlmRunMetrics};
12use crate::output_schema::OutputSchema;
13use crate::status::{WorkflowRunStatus, WorkflowStepStatus};
14use crate::traits::action_executor::ActionRegistry;
15use crate::traits::item_provider::ItemProviderRegistry;
16use crate::traits::persistence::WorkflowPersistence;
17use crate::traits::run_context::RunContext;
18use crate::traits::script_env_provider::ScriptEnvProvider;
19use crate::types::{
20 ContextEntry, StepKey, StepResult, WorkflowExecConfig, WorkflowResult, WorkflowRunStep,
21};
22
23#[derive(Clone)]
25pub struct ResumeContext {
26 pub step_map: HashMap<String, HashMap<u32, WorkflowRunStep>>,
28}
29
30#[derive(Clone)]
32pub struct ExecutionState {
33 pub persistence: Arc<dyn WorkflowPersistence>,
34 pub action_registry: Arc<ActionRegistry>,
35 pub script_env_provider: Arc<dyn ScriptEnvProvider>,
36 pub workflow_run_id: String,
37 pub workflow_name: String,
38 pub run_ctx: Arc<dyn RunContext>,
41 pub extra_plugin_dirs: Vec<String>,
45 pub model: Option<String>,
46 pub exec_config: WorkflowExecConfig,
47 pub inputs: HashMap<String, String>,
48 pub parent_run_id: String,
49 pub depth: u32,
50 pub target_label: Option<String>,
51 pub step_results: HashMap<String, StepResult>,
53 pub contexts: Vec<ContextEntry>,
54 pub position: i64,
55 pub all_succeeded: bool,
56 pub total_cost: f64,
57 pub total_turns: i64,
58 pub total_duration_ms: i64,
59 pub total_input_tokens: i64,
60 pub total_output_tokens: i64,
61 pub total_cache_read_input_tokens: i64,
62 pub total_cache_creation_input_tokens: i64,
63 pub has_llm_metrics: bool,
64 pub last_gate_feedback: Option<String>,
65 pub block_output: Option<String>,
66 pub block_with: Vec<String>,
67 pub resume_ctx: Option<ResumeContext>,
68 pub default_as_identity: Option<String>,
69 pub triggered_by_hook: bool,
70 #[allow(clippy::type_complexity)]
73 pub schema_resolver: Option<Arc<dyn Fn(&str) -> Result<OutputSchema> + Send + Sync>>,
74 pub child_runner: Option<Arc<dyn ChildWorkflowRunner>>,
76 pub last_heartbeat_at: Arc<AtomicI64>,
77 pub registry: Arc<ItemProviderRegistry>,
78 pub event_sinks: Arc<[Arc<dyn EventSink>]>,
80 pub cancellation: CancellationToken,
82 pub current_execution_id: Arc<Mutex<Option<(String, String)>>>,
86 pub owner_token: Option<String>,
89 pub lease_generation: Option<i64>,
90}
91
92pub struct ChildWorkflowInput {
94 pub inputs: HashMap<String, String>,
95 pub iteration: u32,
96 pub as_identity: Option<String>,
97 pub depth: u32,
98 pub parent_step_id: Option<String>,
99 pub cancellation: CancellationToken,
103}
104
105#[non_exhaustive]
117#[derive(Clone)]
118pub struct ChildWorkflowContext {
119 pub run_ctx: Arc<dyn RunContext>,
120 pub extra_plugin_dirs: Vec<String>,
121 pub workflow_run_id: String,
122 pub model: Option<String>,
123 pub exec_config: WorkflowExecConfig,
124 pub inputs: HashMap<String, String>,
125 pub event_sinks: Arc<[Arc<dyn EventSink>]>,
126}
127
128impl ChildWorkflowContext {
129 pub fn new(
130 run_ctx: Arc<dyn RunContext>,
131 extra_plugin_dirs: Vec<String>,
132 workflow_run_id: String,
133 model: Option<String>,
134 exec_config: WorkflowExecConfig,
135 inputs: HashMap<String, String>,
136 event_sinks: Arc<[Arc<dyn EventSink>]>,
137 ) -> Self {
138 Self {
139 run_ctx,
140 extra_plugin_dirs,
141 workflow_run_id,
142 model,
143 exec_config,
144 inputs,
145 event_sinks,
146 }
147 }
148}
149
150pub trait ChildWorkflowRunner: Send + Sync {
152 fn execute_child(
153 &self,
154 workflow_name: &str,
155 parent_ctx: &ChildWorkflowContext,
156 params: ChildWorkflowInput,
157 ) -> Result<WorkflowResult>;
158
159 fn resume_child(
160 &self,
161 workflow_run_id: &str,
162 model: Option<&str>,
163 parent_ctx: &ChildWorkflowContext,
164 ) -> Result<WorkflowResult>;
165
166 fn find_resumable_child(
167 &self,
168 parent_run_id: &str,
169 workflow_name: &str,
170 ) -> Result<Option<crate::types::WorkflowRun>>;
171}
172
173impl ExecutionState {
174 pub fn new_heartbeat() -> Arc<AtomicI64> {
176 Arc::new(AtomicI64::new(0))
177 }
178
179 pub fn expect_lease_generation(&self) -> i64 {
185 self.lease_generation
186 .expect("lease_generation must be set after FlowEngine::run/resume entry")
187 }
188
189 pub fn check_cancellation_throttled(&self) -> Result<()> {
210 use crate::cancellation_reason::CancellationReason;
211
212 let now_secs = SystemTime::now()
213 .duration_since(UNIX_EPOCH)
214 .unwrap_or_else(|e| {
215 tracing::warn!("system clock regressed: {e}; cancellation check suppressed");
216 e.duration()
217 })
218 .as_secs() as i64;
219 let last = self.last_heartbeat_at.load(Ordering::Relaxed);
220 if now_secs - last < 5 {
221 return Ok(());
222 }
223 self.last_heartbeat_at.store(now_secs, Ordering::Relaxed);
224 match self.persistence.is_run_cancelled(&self.workflow_run_id) {
225 Ok(true) => {
226 tracing::info!(
227 "Workflow run {} cancelled externally, stopping execution",
228 self.workflow_run_id
229 );
230 self.cancellation
231 .cancel(CancellationReason::UserRequested(None));
232 return Err(EngineError::Cancelled(CancellationReason::UserRequested(
233 None,
234 )));
235 }
236 Ok(false) => {}
237 Err(e) => {
238 tracing::warn!(
239 "Database error during cancellation check for workflow run {}: {}",
240 self.workflow_run_id,
241 e
242 );
243 }
244 }
245 Ok(())
246 }
247
248 pub fn child_workflow_context(&self) -> ChildWorkflowContext {
251 ChildWorkflowContext {
252 run_ctx: Arc::clone(&self.run_ctx),
253 extra_plugin_dirs: self.extra_plugin_dirs.clone(),
254 workflow_run_id: self.workflow_run_id.clone(),
255 model: self.model.clone(),
256 exec_config: self.exec_config.clone(),
257 inputs: self.inputs.clone(),
258 event_sinks: Arc::clone(&self.event_sinks),
259 }
260 }
261
262 pub fn fork_child(&self, cancellation: CancellationToken) -> ExecutionState {
267 let mut child = self.clone();
268 child.inputs.clear();
269 child.step_results.clear();
270 child.contexts.clear();
271 child.position = 0;
272 child.all_succeeded = true;
273 child.total_cost = 0.0;
274 child.total_turns = 0;
275 child.total_duration_ms = 0;
276 child.total_input_tokens = 0;
277 child.total_output_tokens = 0;
278 child.total_cache_read_input_tokens = 0;
279 child.total_cache_creation_input_tokens = 0;
280 child.has_llm_metrics = false;
281 child.last_gate_feedback = None;
282 child.block_output = None;
283 child.block_with.clear();
284 child.resume_ctx = None;
285 child.triggered_by_hook = false;
286 child.last_heartbeat_at = Self::new_heartbeat();
287 child.cancellation = cancellation;
288 child.current_execution_id = Arc::new(std::sync::Mutex::new(None));
289 child.owner_token = None;
290 child.lease_generation = None;
291 child
292 }
293
294 #[allow(clippy::too_many_arguments)]
298 pub fn accumulate_metrics(
299 &mut self,
300 cost: Option<f64>,
301 turns: Option<i64>,
302 duration: Option<i64>,
303 input_tokens: Option<i64>,
304 output_tokens: Option<i64>,
305 cache_read: Option<i64>,
306 cache_create: Option<i64>,
307 ) -> bool {
308 let mut changed = false;
309 if let Some(c) = cost {
310 self.total_cost += c;
311 changed = true;
312 }
313 if let Some(t) = turns {
314 self.total_turns += t;
315 changed = true;
316 }
317 if let Some(d) = duration {
318 self.total_duration_ms += d;
319 changed = true;
320 }
321 if let Some(t) = input_tokens {
322 self.total_input_tokens += t;
323 changed = true;
324 }
325 if let Some(t) = output_tokens {
326 self.total_output_tokens += t;
327 changed = true;
328 }
329 if let Some(t) = cache_read {
330 self.total_cache_read_input_tokens += t;
331 changed = true;
332 }
333 if let Some(t) = cache_create {
334 self.total_cache_creation_input_tokens += t;
335 changed = true;
336 }
337 changed
338 }
339}
340
341pub fn resolve_schema(state: &ExecutionState, name: &str) -> Result<OutputSchema> {
343 match &state.schema_resolver {
344 Some(resolver) => resolver(name),
345 None => Err(EngineError::Workflow(format!(
346 "No schema resolver configured — cannot load schema '{name}'"
347 ))),
348 }
349}
350
351pub fn emit_event(state: &ExecutionState, event: EngineEvent) {
356 crate::events::emit_to_sinks(&state.workflow_run_id, event, &state.event_sinks);
357}
358
359pub fn completed_keys_from_steps(steps: &[WorkflowRunStep]) -> HashSet<StepKey> {
361 steps
362 .iter()
363 .filter(|s| s.status == WorkflowStepStatus::Completed)
364 .map(|s| (s.step_name.clone(), s.iteration as u32))
365 .collect()
366}
367
368pub fn apply_workflow_input_defaults(
370 workflow: &WorkflowDef,
371 inputs: &mut HashMap<String, String>,
372) -> Result<()> {
373 for input_decl in &workflow.inputs {
374 if input_decl.required && !inputs.contains_key(&input_decl.name) {
375 return Err(EngineError::Workflow(format!(
376 "Missing required input: '{}'. Use --input {}=<value>.",
377 input_decl.name, input_decl.name
378 )));
379 }
380 if let Some(ref default) = input_decl.default {
381 inputs
382 .entry(input_decl.name.clone())
383 .or_insert_with(|| default.clone());
384 }
385 if input_decl.input_type == InputType::Boolean {
386 inputs
387 .entry(input_decl.name.clone())
388 .or_insert_with(|| "false".to_string());
389 }
390 }
391 Ok(())
392}
393
394pub fn run_workflow_engine(
396 state: &mut ExecutionState,
397 workflow: &WorkflowDef,
398) -> Result<WorkflowResult> {
399 if state.resume_ctx.is_some() {
401 emit_event(
402 state,
403 EngineEvent::RunResumed {
404 workflow_name: workflow.name.clone(),
405 },
406 );
407 } else {
408 emit_event(
409 state,
410 EngineEvent::RunStarted {
411 workflow_name: workflow.name.clone(),
412 },
413 );
414 }
415
416 let mut body_error: Option<String> = None;
418 let body_result = execute_nodes(state, &workflow.body, true);
419 if let Err(ref e) = body_result {
420 let msg = e.to_string();
421 tracing::error!("Body execution error: {msg}");
422 state.all_succeeded = false;
423 body_error = Some(msg);
424 if matches!(
428 e,
429 EngineError::Cancelled(crate::cancellation_reason::CancellationReason::LeaseLost)
430 ) {
431 state
432 .cancellation
433 .cancel(crate::cancellation_reason::CancellationReason::LeaseLost);
434 }
435 }
436
437 if !workflow.always.is_empty() {
439 let workflow_status = if state.all_succeeded {
440 "completed"
441 } else {
442 "failed"
443 };
444 state
445 .inputs
446 .insert("workflow_status".to_string(), workflow_status.to_string());
447 let saved_all_succeeded = state.all_succeeded;
449 let always_result = execute_nodes(state, &workflow.always, false);
450 state.all_succeeded = saved_all_succeeded;
451 if let Err(ref e) = always_result {
452 tracing::warn!("Always block error (non-fatal): {e}");
453 }
454 }
455
456 let mut summary = crate::helpers::build_workflow_summary(state);
458 if let Some(ref err) = body_error {
459 summary.push_str(&format!("\nError: {err}"));
460 }
461
462 let wf_run_id = state.workflow_run_id.clone();
464 let is_cancelled = matches!(&body_result, Err(EngineError::Cancelled(_)));
465
466 emit_event(
467 state,
468 EngineEvent::MetricsUpdated {
469 total_cost: state.total_cost,
470 total_turns: state.total_turns,
471 total_duration_ms: state.total_duration_ms,
472 },
473 );
474
475 if state.all_succeeded {
476 state.persistence.update_run_status(
477 &wf_run_id,
478 WorkflowRunStatus::Completed,
479 Some(&summary),
480 None,
481 )?;
482 tracing::info!("Workflow '{}' completed successfully", workflow.name);
483 emit_event(state, EngineEvent::RunCompleted { succeeded: true });
484 } else if is_cancelled {
485 let cancel_reason = state
486 .cancellation
487 .reason()
488 .unwrap_or(crate::cancellation_reason::CancellationReason::UserRequested(None));
489 state.persistence.update_run_status(
490 &wf_run_id,
491 WorkflowRunStatus::Cancelled,
492 Some(&summary),
493 body_error.as_deref(),
494 )?;
495 tracing::warn!("Workflow '{}' was cancelled", workflow.name);
496 emit_event(
497 state,
498 EngineEvent::RunCancelled {
499 reason: cancel_reason,
500 },
501 );
502 } else {
503 state.persistence.update_run_status(
504 &wf_run_id,
505 WorkflowRunStatus::Failed,
506 Some(&summary),
507 body_error.as_deref(),
508 )?;
509 tracing::warn!("Workflow '{}' finished with failures", workflow.name);
510 emit_event(state, EngineEvent::RunCompleted { succeeded: false });
511 }
512
513 tracing::info!(
514 "Total: ${:.4}, {} turns, {:.1}s",
515 state.total_cost,
516 state.total_turns,
517 state.total_duration_ms as f64 / 1000.0
518 );
519
520 let mut result_extensions = Extensions::default();
521 if state.has_llm_metrics {
522 let metrics = LlmRunMetrics {
523 total_input_tokens: (state.total_input_tokens != 0).then_some(state.total_input_tokens),
524 total_output_tokens: (state.total_output_tokens != 0)
525 .then_some(state.total_output_tokens),
526 total_cache_read_input_tokens: (state.total_cache_read_input_tokens != 0)
527 .then_some(state.total_cache_read_input_tokens),
528 total_cache_creation_input_tokens: (state.total_cache_creation_input_tokens != 0)
529 .then_some(state.total_cache_creation_input_tokens),
530 total_turns: (state.total_turns != 0).then_some(state.total_turns),
531 total_cost_usd: (state.total_cost != 0.0).then_some(state.total_cost),
532 model: state.model.clone(),
533 };
534 if metrics.total_input_tokens.is_some()
535 || metrics.total_output_tokens.is_some()
536 || metrics.total_cache_read_input_tokens.is_some()
537 || metrics.total_cache_creation_input_tokens.is_some()
538 || metrics.total_turns.is_some()
539 || metrics.total_cost_usd.is_some()
540 || metrics.model.is_some()
541 {
542 result_extensions.insert(metrics);
543 }
544 }
545
546 Ok(WorkflowResult {
547 workflow_run_id: wf_run_id,
548 workflow_name: workflow.name.clone(),
549 all_succeeded: state.all_succeeded,
550 total_duration_ms: state.total_duration_ms,
551 extensions: result_extensions,
552 })
553}
554
555pub fn execute_single_node(
557 state: &mut ExecutionState,
558 node: &WorkflowNode,
559 iteration: u32,
560) -> Result<()> {
561 match node {
562 WorkflowNode::Call(n) => crate::executors::call::execute_call(state, n, iteration)?,
563 WorkflowNode::CallWorkflow(n) => {
564 crate::executors::call_workflow::execute_call_workflow(state, n, iteration)?
565 }
566 WorkflowNode::If(n) => crate::executors::control_flow::execute_if(state, n)?,
567 WorkflowNode::Unless(n) => crate::executors::control_flow::execute_unless(state, n)?,
568 WorkflowNode::While(n) => crate::executors::control_flow::execute_while(state, n)?,
569 WorkflowNode::DoWhile(n) => crate::executors::control_flow::execute_do_while(state, n)?,
570 WorkflowNode::Do(n) => crate::executors::control_flow::execute_do(state, n)?,
571 WorkflowNode::Parallel(n) => {
572 crate::executors::parallel::execute_parallel(state, n, iteration)?
573 }
574 WorkflowNode::Gate(n) => crate::executors::gate::execute_gate(state, n, iteration)?,
575 WorkflowNode::Script(n) => crate::executors::script::execute_script(state, n, iteration)?,
576 WorkflowNode::ForEach(n) => {
577 crate::executors::foreach::execute_foreach(state, n, iteration)?
578 }
579 WorkflowNode::Always(n) => {
580 execute_nodes(state, &n.body, false)?;
582 }
583 }
584 Ok(())
585}
586
587pub fn execute_nodes(
588 state: &mut ExecutionState,
589 nodes: &[WorkflowNode],
590 respect_fail_fast: bool,
591) -> Result<()> {
592 for node in nodes {
593 if respect_fail_fast && !state.all_succeeded && state.exec_config.fail_fast {
594 break;
595 }
596 if state.cancellation.is_cancelled() {
598 return state.cancellation.error_if_cancelled();
599 }
600 state.check_cancellation_throttled()?;
601 execute_single_node(state, node, 0)?;
602 }
603 Ok(())
604}
605
606pub fn record_step_failure(
608 state: &mut ExecutionState,
609 step_key: String,
610 step_label: &str,
611 last_error: String,
612 max_attempts: u32,
613 started: bool,
614) -> Result<()> {
615 state.all_succeeded = false;
616 let step_result = StepResult::failed(step_label, last_error);
617 state.step_results.insert(step_key, step_result);
618
619 if state.exec_config.fail_fast {
620 let msg = if started {
621 format!(
622 "Step '{}' failed after {} attempts",
623 step_label, max_attempts
624 )
625 } else {
626 format!("Step '{}' failed to start (never executed)", step_label)
627 };
628 return Err(EngineError::Workflow(msg));
629 }
630
631 Ok(())
632}
633
634pub fn record_step_skipped(state: &mut ExecutionState, step_key: String, step_label: &str) {
636 tracing::info!("Step '{}' skipped via on_fail = continue", step_label);
637 let step_result = StepResult::skipped(step_label);
638 state.step_results.insert(step_key, step_result);
639}
640
641fn parse_metric_f64(map: &std::collections::HashMap<String, String>, key: &str) -> Option<f64> {
643 map.get(key).and_then(|v| {
644 v.parse::<f64>()
645 .map_err(|e| tracing::warn!("metadata key '{key}' has non-numeric value '{v}': {e}"))
646 .ok()
647 })
648}
649
650fn parse_metric_i64(map: &std::collections::HashMap<String, String>, key: &str) -> Option<i64> {
652 map.get(key).and_then(|v| {
653 v.parse::<i64>()
654 .map_err(|e| tracing::warn!("metadata key '{key}' has non-integer value '{v}': {e}"))
655 .ok()
656 })
657}
658
659pub fn record_step_success(
661 state: &mut ExecutionState,
662 step_key: String,
663 success: crate::types::StepSuccess,
664) {
665 use crate::constants::metadata_keys;
666 let cost_usd = parse_metric_f64(&success.metadata, metadata_keys::COST_USD);
667 let num_turns = parse_metric_i64(&success.metadata, metadata_keys::NUM_TURNS);
668 let duration_ms = parse_metric_i64(&success.metadata, metadata_keys::DURATION_MS);
669 let input_tokens = parse_metric_i64(&success.metadata, metadata_keys::INPUT_TOKENS);
670 let output_tokens = parse_metric_i64(&success.metadata, metadata_keys::OUTPUT_TOKENS);
671 let cache_read = parse_metric_i64(&success.metadata, metadata_keys::CACHE_READ_INPUT_TOKENS);
672 let cache_creation = parse_metric_i64(
673 &success.metadata,
674 metadata_keys::CACHE_CREATION_INPUT_TOKENS,
675 );
676 if state.accumulate_metrics(
677 cost_usd,
678 num_turns,
679 duration_ms,
680 input_tokens,
681 output_tokens,
682 cache_read,
683 cache_creation,
684 ) {
685 state.has_llm_metrics = true;
686 }
687
688 let step_result = StepResult::completed(&success);
689 state.step_results.insert(step_key, step_result);
690
691 state.contexts.push(success.into());
692}
693
694pub fn resolve_child_inputs(
697 raw_inputs: &HashMap<String, String>,
698 vars: &HashMap<String, String>,
699 input_decls: &[crate::dsl::InputDecl],
700) -> std::result::Result<HashMap<String, String>, String> {
701 let mut child_inputs = HashMap::new();
702 for (k, v) in raw_inputs {
703 child_inputs.insert(
704 k.clone(),
705 crate::prompt_builder::substitute_variables_keep_literal(v, vars),
706 );
707 }
708 for decl in input_decls {
709 if !child_inputs.contains_key(&decl.name) {
710 if decl.required {
711 return Err(decl.name.clone());
712 }
713 if let Some(ref default) = decl.default {
714 child_inputs.insert(decl.name.clone(), default.clone());
715 }
716 if decl.input_type == crate::dsl::InputType::Boolean {
717 child_inputs
718 .entry(decl.name.clone())
719 .or_insert_with(|| "false".to_string());
720 }
721 }
722 }
723 Ok(child_inputs)
724}
725
726pub fn run_on_fail_agent(
728 state: &mut ExecutionState,
729 step_label: &str,
730 on_fail_agent: &crate::dsl::AgentRef,
731 last_error: &str,
732 retries: u32,
733 iteration: u32,
734) {
735 tracing::warn!(
736 "All retries exhausted for '{}', running on_fail agent '{}'",
737 step_label,
738 on_fail_agent.label(),
739 );
740 state
741 .inputs
742 .insert("failed_step".to_string(), step_label.to_string());
743 state
744 .inputs
745 .insert("failure_reason".to_string(), last_error.to_string());
746 state
747 .inputs
748 .insert("retry_count".to_string(), retries.to_string());
749
750 let on_fail_node = crate::dsl::CallNode {
751 agent: on_fail_agent.clone(),
752 retries: 0,
753 on_fail: None,
754 output: None,
755 with: Vec::new(),
756 as_identity: None,
757 plugin_dirs: Vec::new(),
758 timeout: None,
759 max_turns: None,
760 };
761 if let Err(e) = crate::executors::call::execute_call(state, &on_fail_node, iteration) {
762 tracing::warn!("on_fail agent '{}' also failed: {e}", on_fail_agent.label(),);
763 }
764
765 state.inputs.remove("failed_step");
766 state.inputs.remove("failure_reason");
767 state.inputs.remove("retry_count");
768}
769
770#[allow(clippy::too_many_arguments)]
772pub fn handle_on_fail(
773 state: &mut ExecutionState,
774 step_key: String,
775 step_label: &str,
776 on_fail: &Option<OnFail>,
777 last_error: String,
778 retries: u32,
779 iteration: u32,
780 max_attempts: u32,
781) -> Result<()> {
782 match on_fail {
783 Some(OnFail::Continue) => {
784 record_step_skipped(state, step_key, step_label);
785 return Ok(());
786 }
787 Some(OnFail::Agent(ref on_fail_agent)) => {
788 run_on_fail_agent(
789 state,
790 step_label,
791 on_fail_agent,
792 &last_error,
793 retries,
794 iteration,
795 );
796 }
797 None => {}
798 }
799 record_step_failure(state, step_key, step_label, last_error, max_attempts, true)
800}
801
802pub fn should_skip(state: &ExecutionState, step_name: &str, iteration: u32) -> bool {
804 state.resume_ctx.as_ref().is_some_and(|ctx| {
805 ctx.step_map
806 .get(step_name)
807 .is_some_and(|m| m.contains_key(&iteration))
808 })
809}
810
811fn parse_markers_out(markers_json: Option<&str>, step_name: &str) -> Vec<String> {
813 markers_json
814 .and_then(|m| {
815 serde_json::from_str(m)
816 .map_err(|e| {
817 tracing::warn!("Malformed markers_out JSON in step '{step_name}': {e}")
818 })
819 .ok()
820 })
821 .unwrap_or_default()
822}
823
824pub fn restore_step(state: &mut ExecutionState, key: &str, iteration: u32) {
827 let ctx = state.resume_ctx.take();
828 if let Some(ref ctx) = ctx {
829 restore_completed_step(state, ctx, key, iteration);
830 }
831 state.resume_ctx = ctx;
832}
833
834pub fn restore_completed_step(
836 state: &mut ExecutionState,
837 ctx: &ResumeContext,
838 step_key: &str,
839 iteration: u32,
840) {
841 let completed_step = ctx.step_map.get(step_key).and_then(|m| m.get(&iteration));
842
843 let Some(step) = completed_step else {
844 tracing::warn!(
845 "resume: step '{step_key}:{iteration}' in skip set but not found in resume context \
846 — downstream variable substitution may be incorrect"
847 );
848 return;
849 };
850
851 let markers = parse_markers_out(step.markers_out.as_deref(), step_key);
852 let context = step.context_out.clone().unwrap_or_default();
853
854 if let Some(ref feedback) = step.gate_feedback {
856 state.last_gate_feedback = Some(feedback.clone());
857 }
858
859 let success = crate::types::StepSuccess::from_workflow_run_step(
860 step_key.to_string(),
861 step,
862 markers,
863 context,
864 iteration,
865 );
866 let step_result = StepResult::completed_without_metrics(&success);
867 state.step_results.insert(step_key.to_string(), step_result);
868
869 state.contexts.push(success.into());
870}
871
872pub type ChildCompletionData = (
879 (Vec<String>, String),
880 HashMap<String, StepResult>,
881 Vec<ContextEntry>,
882);
883
884pub fn fetch_child_completion_data(
892 persistence: &dyn WorkflowPersistence,
893 workflow_run_id: &str,
894) -> ChildCompletionData {
895 let steps = match persistence.get_steps(workflow_run_id) {
896 Ok(s) => s,
897 Err(e) => {
898 tracing::warn!(
899 "Failed to fetch steps for child workflow run '{}': {e}",
900 workflow_run_id,
901 );
902 return ((Vec::new(), String::new()), HashMap::new(), Vec::new());
903 }
904 };
905
906 let mut completed: Vec<_> = steps
909 .into_iter()
910 .filter(|s| s.status == WorkflowStepStatus::Completed)
911 .collect();
912 completed.sort_by_key(|s| s.position);
913
914 let final_output = match completed.iter().max_by_key(|s| s.position) {
915 Some(step) => {
916 let markers = parse_markers_out(step.markers_out.as_deref(), &step.step_name);
917 let context = step.context_out.clone().unwrap_or_default();
918 (markers, context)
919 }
920 None => (Vec::new(), String::new()),
921 };
922
923 let mut child_steps = HashMap::with_capacity(completed.len());
928 let mut child_contexts = Vec::with_capacity(completed.len());
929 for s in completed {
930 let markers = parse_markers_out(s.markers_out.as_deref(), &s.step_name);
931 let context = s.context_out.clone().unwrap_or_default();
932 let success = crate::types::StepSuccess::from_workflow_run_step(
933 s.step_name.clone(),
934 &s,
935 markers,
936 context,
937 0,
938 );
939 let result = StepResult::completed_without_metrics(&success);
940 let entry: ContextEntry = success.into();
941 child_steps.insert(s.step_name, result);
942 child_contexts.push(entry);
943 }
944
945 (final_output, child_steps, child_contexts)
946}
947
948pub fn check_stuck(
951 state: &mut ExecutionState,
952 prev_marker_sets: &mut VecDeque<HashSet<String>>,
953 step: &str,
954 marker: &str,
955 stuck_after: u32,
956 loop_kind: &str,
957) -> Result<()> {
958 let current_markers: HashSet<String> = state
959 .step_results
960 .get(step)
961 .map(|r| r.markers.iter().cloned().collect())
962 .unwrap_or_default();
963
964 prev_marker_sets.push_back(current_markers.clone());
965 if prev_marker_sets.len() > stuck_after as usize {
966 prev_marker_sets.pop_front();
967 }
968
969 if prev_marker_sets.len() >= stuck_after as usize
970 && prev_marker_sets.iter().all(|s| s == ¤t_markers)
971 {
972 tracing::warn!(
973 "{loop_kind} {step}.{marker} — stuck: identical markers for {stuck_after} consecutive iterations",
974 );
975 state.all_succeeded = false;
976 return Err(EngineError::Workflow(format!(
977 "{loop_kind} {step}.{marker} stuck after {stuck_after} iterations with identical markers",
978 )));
979 }
980
981 Ok(())
982}
983
984pub fn check_max_iterations(
986 state: &mut ExecutionState,
987 iteration: u32,
988 max_iterations: u32,
989 on_max_iter: &crate::dsl::OnMaxIter,
990 step: &str,
991 marker: &str,
992 loop_kind: &str,
993) -> Result<bool> {
994 if iteration >= max_iterations {
995 tracing::warn!("{loop_kind} {step}.{marker} — reached max_iterations ({max_iterations})",);
996 match on_max_iter {
997 crate::dsl::OnMaxIter::Fail => {
998 state.all_succeeded = false;
999 return Err(EngineError::Workflow(format!(
1000 "{loop_kind} {step}.{marker} reached max_iterations ({max_iterations})",
1001 )));
1002 }
1003 crate::dsl::OnMaxIter::Continue => return Ok(true),
1004 }
1005 }
1006 Ok(false)
1007}
1008
1009pub fn build_variable_map(state: &ExecutionState) -> HashMap<String, String> {
1011 crate::prompt_builder::build_variable_map(state)
1012}
1013
1014pub fn flow_output_instruction() -> &'static str {
1016 FLOW_OUTPUT_INSTRUCTION
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021 use super::*;
1022 use crate::dsl::{InputDecl, InputType, WorkflowDef, WorkflowTrigger};
1023
1024 fn make_bool_workflow(
1025 name: &str,
1026 input_name: &str,
1027 required: bool,
1028 default: Option<&str>,
1029 ) -> WorkflowDef {
1030 WorkflowDef {
1031 name: name.to_string(),
1032 title: None,
1033 description: String::new(),
1034 trigger: WorkflowTrigger::Manual,
1035 targets: vec![],
1036 group: None,
1037 inputs: vec![InputDecl {
1038 name: input_name.to_string(),
1039 input_type: InputType::Boolean,
1040 required,
1041 default: default.map(|s| s.to_string()),
1042 description: None,
1043 }],
1044 body: vec![],
1045 always: vec![],
1046 source_path: String::new(),
1047 }
1048 }
1049
1050 #[test]
1051 fn test_boolean_input_defaults_to_false_when_absent() {
1052 let workflow = make_bool_workflow("wf", "flag", false, None);
1053 let mut inputs = HashMap::new();
1054 apply_workflow_input_defaults(&workflow, &mut inputs).unwrap();
1055 assert_eq!(inputs.get("flag").map(|s| s.as_str()), Some("false"));
1056 }
1057
1058 #[test]
1059 fn test_boolean_input_uses_explicit_default_over_false() {
1060 let workflow = make_bool_workflow("wf", "flag", false, Some("true"));
1061 let mut inputs = HashMap::new();
1062 apply_workflow_input_defaults(&workflow, &mut inputs).unwrap();
1063 assert_eq!(inputs.get("flag").map(|s| s.as_str()), Some("true"));
1064 }
1065
1066 #[test]
1067 fn test_boolean_input_caller_value_not_overwritten() {
1068 let workflow = make_bool_workflow("wf", "flag", false, None);
1069 let mut inputs = HashMap::new();
1070 inputs.insert("flag".to_string(), "true".to_string());
1071 apply_workflow_input_defaults(&workflow, &mut inputs).unwrap();
1072 assert_eq!(inputs.get("flag").map(|s| s.as_str()), Some("true"));
1073 }
1074
1075 #[test]
1076 fn test_boolean_input_required_and_missing_is_error() {
1077 let workflow = make_bool_workflow("wf", "flag", true, None);
1078 let mut inputs = HashMap::new();
1079 let result = apply_workflow_input_defaults(&workflow, &mut inputs);
1080 assert!(result.is_err(), "expected error for missing required input");
1081 }
1082
1083 #[test]
1084 fn fork_child_resets_runtime_state_and_preserves_shared_config() {
1085 use crate::cancellation::CancellationToken;
1086 use crate::persistence_memory::InMemoryWorkflowPersistence;
1087 use crate::traits::script_env_provider::NoOpScriptEnvProvider;
1088 use crate::types::WorkflowExecConfig;
1089
1090 struct DummyChildRunner;
1091 impl ChildWorkflowRunner for DummyChildRunner {
1092 fn execute_child(
1093 &self,
1094 _workflow_name: &str,
1095 _parent_ctx: &ChildWorkflowContext,
1096 _params: ChildWorkflowInput,
1097 ) -> Result<crate::types::WorkflowResult> {
1098 unimplemented!()
1099 }
1100 fn resume_child(
1101 &self,
1102 _workflow_run_id: &str,
1103 _model: Option<&str>,
1104 _parent_ctx: &ChildWorkflowContext,
1105 ) -> Result<crate::types::WorkflowResult> {
1106 unimplemented!()
1107 }
1108 fn find_resumable_child(
1109 &self,
1110 _parent_run_id: &str,
1111 _workflow_name: &str,
1112 ) -> Result<Option<crate::types::WorkflowRun>> {
1113 unimplemented!()
1114 }
1115 }
1116
1117 let parent = ExecutionState {
1118 persistence: Arc::new(InMemoryWorkflowPersistence::new()),
1119 action_registry: Arc::new(crate::traits::action_executor::ActionRegistry::new(
1120 HashMap::new(),
1121 None,
1122 )),
1123 script_env_provider: Arc::new(NoOpScriptEnvProvider),
1124 workflow_run_id: "run-1".to_string(),
1125 workflow_name: "wf".to_string(),
1126 run_ctx: {
1127 let mut vars = std::collections::HashMap::new();
1128 vars.insert("worktree_id", "wt".to_string());
1129 vars.insert("repo_path", "/repo".to_string());
1130 vars.insert("ticket_id", "TICK-1".to_string());
1131 vars.insert("repo_id", "repo-1".to_string());
1132 Arc::new(
1133 crate::traits::run_context::NoopRunContext::with_vars(vars)
1134 .with_working_dir("/tmp"),
1135 ) as Arc<dyn RunContext>
1136 },
1137 extra_plugin_dirs: vec!["plugins".to_string()],
1138 model: Some("gpt-4".to_string()),
1139 exec_config: WorkflowExecConfig::default(),
1140 inputs: {
1141 let mut m = HashMap::new();
1142 m.insert("key".to_string(), "val".to_string());
1143 m
1144 },
1145 parent_run_id: "parent-1".to_string(),
1146 depth: 3,
1147 target_label: Some("label".to_string()),
1148 step_results: {
1149 let mut m = HashMap::new();
1150 m.insert("step".to_string(), StepResult::default());
1151 m
1152 },
1153 contexts: vec![ContextEntry {
1154 step: "step".to_string(),
1155 iteration: 1,
1156 context: "ctx".to_string(),
1157 markers: vec![],
1158 structured_output: None,
1159 output_file: None,
1160 }],
1161 position: 42,
1162 all_succeeded: false,
1163 total_cost: 1.23,
1164 total_turns: 5,
1165 total_duration_ms: 1000,
1166 total_input_tokens: 100,
1167 total_output_tokens: 200,
1168 total_cache_read_input_tokens: 50,
1169 total_cache_creation_input_tokens: 25,
1170 has_llm_metrics: false,
1171 last_gate_feedback: Some("feedback".to_string()),
1172 block_output: Some("output".to_string()),
1173 block_with: vec!["with".to_string()],
1174 resume_ctx: None,
1175 default_as_identity: Some("bot".to_string()),
1176 triggered_by_hook: true,
1177 schema_resolver: None,
1178 child_runner: Some(Arc::new(DummyChildRunner)),
1179 last_heartbeat_at: ExecutionState::new_heartbeat(),
1180 registry: Arc::new(crate::traits::item_provider::ItemProviderRegistry::new()),
1181 event_sinks: Arc::from(vec![]),
1182 cancellation: CancellationToken::new(),
1183 current_execution_id: Arc::new(std::sync::Mutex::new(None)),
1184 owner_token: None,
1185 lease_generation: None,
1186 };
1187
1188 let child_cancellation = CancellationToken::new();
1189 let child = parent.fork_child(child_cancellation.clone());
1190
1191 assert_eq!(child.workflow_run_id, "run-1");
1193 assert_eq!(child.workflow_name, "wf");
1194 assert_eq!(child.run_ctx.working_dir_str(), "/tmp");
1195 assert_eq!(child.model, Some("gpt-4".to_string()));
1196 assert_eq!(child.depth, 3);
1197 assert_eq!(child.target_label, Some("label".to_string()));
1198 assert_eq!(child.default_as_identity, Some("bot".to_string()));
1199 assert_eq!(child.parent_run_id, "parent-1");
1200
1201 assert!(child.inputs.is_empty(), "inputs should be cleared");
1203 assert!(
1204 child.step_results.is_empty(),
1205 "step_results should be cleared"
1206 );
1207 assert!(child.contexts.is_empty(), "contexts should be cleared");
1208 assert_eq!(child.position, 0);
1209 assert!(child.all_succeeded);
1210 assert_eq!(child.total_cost, 0.0);
1211 assert_eq!(child.total_turns, 0);
1212 assert_eq!(child.total_duration_ms, 0);
1213 assert_eq!(child.total_input_tokens, 0);
1214 assert_eq!(child.total_output_tokens, 0);
1215 assert_eq!(child.total_cache_read_input_tokens, 0);
1216 assert_eq!(child.total_cache_creation_input_tokens, 0);
1217 assert!(
1218 !child.has_llm_metrics,
1219 "has_llm_metrics should be reset in fork_child"
1220 );
1221 assert!(child.last_gate_feedback.is_none());
1222 assert!(child.block_output.is_none());
1223 assert!(child.block_with.is_empty());
1224 assert!(child.resume_ctx.is_none());
1225 assert!(!child.triggered_by_hook);
1226 assert!(child.schema_resolver.is_none());
1227 assert!(
1228 child.child_runner.is_some(),
1229 "child_runner should be cloned from parent"
1230 );
1231
1232 assert!(!child.cancellation.is_cancelled());
1234 assert!(std::sync::Arc::ptr_eq(
1235 &child.current_execution_id,
1236 &child.current_execution_id
1237 ));
1238 }
1239
1240 #[test]
1241 fn child_workflow_context_projects_fields() {
1242 use crate::cancellation::CancellationToken;
1243 use crate::events::{EngineEventData, EventSink};
1244 use crate::persistence_memory::InMemoryWorkflowPersistence;
1245 use crate::traits::script_env_provider::NoOpScriptEnvProvider;
1246 use crate::types::WorkflowExecConfig;
1247
1248 struct TestSink;
1249 impl EventSink for TestSink {
1250 fn emit(&self, _: &EngineEventData) {}
1251 }
1252
1253 let sinks: Arc<[Arc<dyn EventSink>]> = Arc::from(vec![
1254 Arc::new(TestSink) as Arc<dyn EventSink>,
1255 Arc::new(TestSink) as Arc<dyn EventSink>,
1256 ]);
1257
1258 let mut state_inputs = HashMap::new();
1259 state_inputs.insert("ticket_id".to_string(), "TICK-42".to_string());
1260 state_inputs.insert("repo_id".to_string(), "repo-7".to_string());
1261
1262 let exec_config = WorkflowExecConfig {
1264 dry_run: true,
1265 ..WorkflowExecConfig::default()
1266 };
1267
1268 let parent = ExecutionState {
1269 persistence: Arc::new(InMemoryWorkflowPersistence::new()),
1270 action_registry: Arc::new(crate::traits::action_executor::ActionRegistry::new(
1271 HashMap::new(),
1272 None,
1273 )),
1274 script_env_provider: Arc::new(NoOpScriptEnvProvider),
1275 workflow_run_id: "run-projection-test".to_string(),
1276 workflow_name: "wf-projection".to_string(),
1277 run_ctx: {
1278 let mut vars = std::collections::HashMap::new();
1279 vars.insert("worktree_id", "wt-9".to_string());
1280 vars.insert("repo_path", "/repo/proj".to_string());
1281 vars.insert("ticket_id", "TICK-42".to_string());
1282 vars.insert("repo_id", "repo-7".to_string());
1283 Arc::new(
1284 crate::traits::run_context::NoopRunContext::with_vars(vars)
1285 .with_working_dir("/tmp/proj"),
1286 ) as Arc<dyn RunContext>
1287 },
1288 extra_plugin_dirs: vec!["plugin-a".to_string()],
1289 model: Some("opus".to_string()),
1290 exec_config: exec_config.clone(),
1291 inputs: state_inputs.clone(),
1292 parent_run_id: "parent-7".to_string(),
1293 depth: 2,
1294 target_label: Some("proj-label".to_string()),
1295 step_results: HashMap::new(),
1296 contexts: vec![],
1297 position: 11,
1298 all_succeeded: false,
1299 total_cost: 0.0,
1300 total_turns: 0,
1301 total_duration_ms: 0,
1302 total_input_tokens: 0,
1303 total_output_tokens: 0,
1304 total_cache_read_input_tokens: 0,
1305 total_cache_creation_input_tokens: 0,
1306 has_llm_metrics: false,
1307 last_gate_feedback: None,
1308 block_output: None,
1309 block_with: vec![],
1310 resume_ctx: None,
1311 default_as_identity: None,
1312 triggered_by_hook: true,
1313 schema_resolver: None,
1314 child_runner: None,
1315 last_heartbeat_at: ExecutionState::new_heartbeat(),
1316 registry: Arc::new(crate::traits::item_provider::ItemProviderRegistry::new()),
1317 event_sinks: Arc::clone(&sinks),
1318 cancellation: CancellationToken::new(),
1319 current_execution_id: Arc::new(std::sync::Mutex::new(None)),
1320 owner_token: None,
1321 lease_generation: None,
1322 };
1323
1324 let ctx = parent.child_workflow_context();
1325
1326 assert_eq!(ctx.run_ctx.get("worktree_id").as_deref(), Some("wt-9"));
1328 assert_eq!(ctx.run_ctx.working_dir_str(), "/tmp/proj");
1329 assert_eq!(ctx.run_ctx.get("repo_path").as_deref(), Some("/repo/proj"));
1330 assert_eq!(ctx.run_ctx.get("ticket_id").as_deref(), Some("TICK-42"));
1331 assert_eq!(ctx.run_ctx.get("repo_id").as_deref(), Some("repo-7"));
1332 assert_eq!(ctx.extra_plugin_dirs, vec!["plugin-a"]);
1333 assert_eq!(ctx.workflow_run_id, "run-projection-test");
1334 assert_eq!(ctx.model.as_deref(), Some("opus"));
1335 assert!(ctx.exec_config.dry_run);
1336 assert_eq!(ctx.inputs, state_inputs);
1337
1338 assert_eq!(ctx.event_sinks.len(), 2);
1340 assert!(
1341 Arc::ptr_eq(&ctx.event_sinks, &sinks),
1342 "event_sinks slice should be shared via Arc, not cloned"
1343 );
1344 }
1345
1346 use crate::test_helpers::CountingPersistence;
1347
1348 fn make_state_with_counting_persistence(
1350 cp: std::sync::Arc<CountingPersistence>,
1351 run_id: String,
1352 ) -> ExecutionState {
1353 crate::test_helpers::make_test_execution_state(
1354 cp as Arc<dyn crate::traits::persistence::WorkflowPersistence>,
1355 run_id,
1356 )
1357 }
1358
1359 #[test]
1362 fn check_cancellation_throttled_propagates_external_cancel() {
1363 let cp = Arc::new(CountingPersistence::new());
1364 cp.set_cancelled(true);
1365 let state = make_state_with_counting_persistence(Arc::clone(&cp), "run-1".into());
1366
1367 assert!(!state.cancellation.is_cancelled());
1368 let result = state.check_cancellation_throttled();
1369 assert!(
1370 matches!(result, Err(EngineError::Cancelled(_))),
1371 "expected Err(Cancelled), got {result:?}"
1372 );
1373 assert!(
1374 state.cancellation.is_cancelled(),
1375 "helper must set state.cancellation on external cancel"
1376 );
1377 }
1378
1379 #[test]
1380 fn check_stuck_bounds_buffer() {
1381 use crate::persistence_memory::InMemoryWorkflowPersistence;
1382 use crate::types::StepResult;
1383
1384 let mut state = crate::test_helpers::make_test_execution_state(
1385 Arc::new(InMemoryWorkflowPersistence::new()),
1386 "run-bounds".into(),
1387 );
1388
1389 let stuck_after = 3u32;
1390 let mut prev_marker_sets: VecDeque<HashSet<String>> = VecDeque::new();
1391
1392 for i in 0u32..10 {
1393 let result = StepResult {
1394 markers: vec![format!("marker-{i}")],
1395 ..Default::default()
1396 };
1397 state.step_results.insert("step".to_string(), result);
1398
1399 let res = check_stuck(
1400 &mut state,
1401 &mut prev_marker_sets,
1402 "step",
1403 "m",
1404 stuck_after,
1405 "while",
1406 );
1407 assert!(
1408 res.is_ok(),
1409 "should not be stuck with changing markers at iteration {i}"
1410 );
1411 assert!(
1412 prev_marker_sets.len() <= stuck_after as usize,
1413 "buffer exceeded stuck_after at iteration {i}: len={}",
1414 prev_marker_sets.len()
1415 );
1416 }
1417 }
1418
1419 #[test]
1420 fn check_stuck_detects_stuck() {
1421 use crate::persistence_memory::InMemoryWorkflowPersistence;
1422 use crate::types::StepResult;
1423
1424 let mut state = crate::test_helpers::make_test_execution_state(
1425 Arc::new(InMemoryWorkflowPersistence::new()),
1426 "run-stuck".into(),
1427 );
1428
1429 let stuck_after = 3u32;
1430 let mut prev_marker_sets: VecDeque<HashSet<String>> = VecDeque::new();
1431
1432 let step = StepResult {
1433 markers: vec!["same-marker".to_string()],
1434 ..Default::default()
1435 };
1436 state.step_results.insert("step".to_string(), step);
1437
1438 for i in 0u32..stuck_after {
1439 let res = check_stuck(
1440 &mut state,
1441 &mut prev_marker_sets,
1442 "step",
1443 "m",
1444 stuck_after,
1445 "while",
1446 );
1447 if i + 1 < stuck_after {
1448 assert!(res.is_ok(), "should not be stuck yet at iteration {i}");
1449 } else {
1450 assert!(res.is_err(), "should detect stuck at iteration {i}");
1451 }
1452 }
1453 }
1454
1455 #[test]
1456 fn fetch_child_completion_data_bubbles_contexts_in_position_order() {
1457 use crate::persistence_memory::InMemoryWorkflowPersistence;
1458 use crate::traits::persistence::{NewStep, StepUpdate};
1459
1460 let p = InMemoryWorkflowPersistence::new();
1461 let child_run = "01CHILDRUNID0000000000000";
1462 p.seed_run(child_run);
1463
1464 let step_b = p
1469 .insert_step(NewStep {
1470 workflow_run_id: child_run.to_string(),
1471 step_name: "step-b".to_string(),
1472 role: "actor".to_string(),
1473 can_commit: false,
1474 position: 2,
1475 iteration: 0,
1476 retry_count: Some(0),
1477 })
1478 .unwrap();
1479 let step_a = p
1480 .insert_step(NewStep {
1481 workflow_run_id: child_run.to_string(),
1482 step_name: "step-a".to_string(),
1483 role: "actor".to_string(),
1484 can_commit: false,
1485 position: 1,
1486 iteration: 0,
1487 retry_count: Some(0),
1488 })
1489 .unwrap();
1490
1491 p.update_step(
1492 &step_a,
1493 StepUpdate::completed(
1494 0,
1495 None,
1496 Some("a-result".into()),
1497 Some("context-from-a".into()),
1498 Some(r#"["m-a"]"#.into()),
1499 0,
1500 Some(r#"{"k":"a"}"#.into()),
1501 ),
1502 )
1503 .unwrap();
1504 p.update_step(
1505 &step_b,
1506 StepUpdate::completed(
1507 0,
1508 None,
1509 Some("b-result".into()),
1510 Some("context-from-b".into()),
1511 Some(r#"["m-b"]"#.into()),
1512 0,
1513 Some(r#"{"k":"b"}"#.into()),
1514 ),
1515 )
1516 .unwrap();
1517
1518 let ((final_markers, final_context), step_results, child_contexts) =
1519 fetch_child_completion_data(&p, child_run);
1520
1521 assert_eq!(final_markers, vec!["m-b".to_string()]);
1523 assert_eq!(final_context, "context-from-b");
1524
1525 assert!(step_results.contains_key("step-a"));
1527 assert!(step_results.contains_key("step-b"));
1528
1529 assert_eq!(child_contexts.len(), 2);
1533 assert_eq!(child_contexts[0].step, "step-a");
1534 assert_eq!(child_contexts[0].context, "context-from-a");
1535 assert_eq!(child_contexts[0].markers, vec!["m-a".to_string()]);
1536 assert_eq!(
1537 child_contexts[0].structured_output.as_deref(),
1538 Some(r#"{"k":"a"}"#)
1539 );
1540 assert_eq!(child_contexts[1].step, "step-b");
1541 assert_eq!(child_contexts[1].context, "context-from-b");
1542 assert_eq!(child_contexts[1].markers, vec!["m-b".to_string()]);
1543 assert_eq!(
1544 child_contexts[1].structured_output.as_deref(),
1545 Some(r#"{"k":"b"}"#)
1546 );
1547 }
1548
1549 #[test]
1550 fn fetch_child_completion_data_returns_empty_contexts_on_persistence_error() {
1551 use crate::persistence_memory::InMemoryWorkflowPersistence;
1552
1553 let p = InMemoryWorkflowPersistence::new();
1554 p.set_fail_get_steps(true);
1555
1556 let ((markers, context), step_results, child_contexts) =
1557 fetch_child_completion_data(&p, "any-run-id");
1558
1559 assert!(markers.is_empty());
1560 assert!(context.is_empty());
1561 assert!(step_results.is_empty());
1562 assert!(child_contexts.is_empty());
1563 }
1564
1565 #[test]
1566 fn child_workflow_context_new_sets_required_fields_and_zeros_optional() {
1567 use crate::traits::run_context::NoopRunContext;
1568 use crate::types::WorkflowExecConfig;
1569
1570 let run_ctx = Arc::new(NoopRunContext::default()) as Arc<dyn RunContext>;
1571 let ctx = ChildWorkflowContext::new(
1572 Arc::clone(&run_ctx),
1573 vec!["plugins".to_string()],
1574 "run-42".to_string(),
1575 Some("gpt-4".to_string()),
1576 WorkflowExecConfig::default(),
1577 HashMap::new(),
1578 Arc::from(vec![]),
1579 );
1580
1581 assert_eq!(ctx.workflow_run_id, "run-42");
1582 assert_eq!(ctx.extra_plugin_dirs, vec!["plugins"]);
1583 assert_eq!(ctx.model.as_deref(), Some("gpt-4"));
1584 assert!(ctx.inputs.is_empty());
1585 assert_eq!(ctx.event_sinks.len(), 0);
1586 }
1587}