Skip to main content

simple_agents_workflow/
yaml_runner.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::Path;
3use std::time::Instant;
4
5use async_trait::async_trait;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use simple_agent_type::message::{Message, Role};
10use simple_agent_type::request::CompletionRequest;
11use simple_agent_type::response::FinishReason;
12use simple_agent_type::tool::{
13    ToolCall, ToolChoice, ToolChoiceFunction, ToolChoiceMode, ToolChoiceTool, ToolDefinition,
14    ToolFunction, ToolType,
15};
16use simple_agents_core::{
17    CompletionMode, CompletionOptions, CompletionOutcome, SimpleAgentsClient,
18};
19use simple_agents_healing::JsonishParser;
20use thiserror::Error;
21
22use crate::ir::{Node, NodeKind, RouterRoute, WorkflowDefinition, WORKFLOW_IR_V0};
23use crate::observability::tracing::{
24    flush_workflow_tracer, workflow_tracer, SpanKind, TraceContext, WorkflowSpan,
25};
26use crate::runtime::{
27    LlmExecutionError, LlmExecutionInput, LlmExecutionOutput, LlmExecutor, ToolExecutionError,
28    ToolExecutionInput, ToolExecutor, WorkflowRuntime, WorkflowRuntimeError,
29    WorkflowRuntimeOptions,
30};
31use crate::visualize::workflow_to_mermaid;
32
33const YAML_START_NODE_ID: &str = "__yaml_start";
34const YAML_LLM_TOOL_ID: &str = "__yaml_llm_call";
35
36static TRACE_ID_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
37
38#[derive(Debug, Clone, PartialEq, Serialize)]
39pub struct YamlStepTiming {
40    pub node_id: String,
41    pub node_kind: String,
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub model_name: Option<String>,
44    pub elapsed_ms: u128,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub prompt_tokens: Option<u32>,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub completion_tokens: Option<u32>,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub total_tokens: Option<u32>,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub reasoning_tokens: Option<u32>,
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub tokens_per_second: Option<f64>,
55}
56
57#[derive(Debug, Clone, PartialEq, Serialize)]
58pub struct YamlLlmNodeMetrics {
59    pub elapsed_ms: u128,
60    pub prompt_tokens: u32,
61    pub completion_tokens: u32,
62    pub total_tokens: u32,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub reasoning_tokens: Option<u32>,
65    pub tokens_per_second: f64,
66}
67
68#[derive(Debug, Clone, PartialEq, Serialize)]
69pub struct YamlWorkflowRunOutput {
70    pub workflow_id: String,
71    pub entry_node: String,
72    pub email_text: String,
73    pub trace: Vec<String>,
74    pub outputs: BTreeMap<String, Value>,
75    pub terminal_node: String,
76    pub terminal_output: Option<Value>,
77    pub step_timings: Vec<YamlStepTiming>,
78    pub llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics>,
79    pub llm_node_models: BTreeMap<String, String>,
80    pub total_elapsed_ms: u128,
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub ttft_ms: Option<u128>,
83    pub total_input_tokens: u64,
84    pub total_output_tokens: u64,
85    pub total_tokens: u64,
86    #[serde(skip_serializing_if = "Option::is_none")]
87    pub total_reasoning_tokens: Option<u64>,
88    pub tokens_per_second: f64,
89    #[serde(skip_serializing_if = "Option::is_none")]
90    pub trace_id: Option<String>,
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub metadata: Option<Value>,
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
96#[serde(rename_all = "snake_case")]
97pub enum YamlWorkflowPayloadMode {
98    #[default]
99    FullPayload,
100    RedactedPayload,
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
104#[serde(rename_all = "snake_case")]
105pub enum YamlToolTraceMode {
106    #[default]
107    Full,
108    Redacted,
109    Off,
110}
111
112#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
113pub struct YamlWorkflowTraceContextInput {
114    #[serde(default)]
115    pub trace_id: Option<String>,
116    #[serde(default)]
117    pub span_id: Option<String>,
118    #[serde(default)]
119    pub parent_span_id: Option<String>,
120    #[serde(default)]
121    pub traceparent: Option<String>,
122    #[serde(default)]
123    pub tracestate: Option<String>,
124    #[serde(default)]
125    pub baggage: BTreeMap<String, String>,
126}
127
128#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
129pub struct YamlWorkflowTraceTenantContext {
130    #[serde(default)]
131    pub workspace_id: Option<String>,
132    #[serde(default)]
133    pub user_id: Option<String>,
134    #[serde(default)]
135    pub conversation_id: Option<String>,
136    #[serde(default)]
137    pub request_id: Option<String>,
138    #[serde(default)]
139    pub run_id: Option<String>,
140}
141
142#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
143pub struct YamlWorkflowTelemetryConfig {
144    #[serde(default = "default_true")]
145    pub enabled: bool,
146    #[serde(default = "default_true")]
147    pub nerdstats: bool,
148    #[serde(default = "default_sample_rate")]
149    pub sample_rate: f32,
150    #[serde(default)]
151    pub payload_mode: YamlWorkflowPayloadMode,
152    #[serde(default = "default_retention_days")]
153    pub retention_days: u32,
154    #[serde(default = "default_true")]
155    pub multi_tenant: bool,
156    #[serde(default)]
157    pub tool_trace_mode: YamlToolTraceMode,
158}
159
160impl Default for YamlWorkflowTelemetryConfig {
161    fn default() -> Self {
162        Self {
163            enabled: true,
164            nerdstats: true,
165            sample_rate: 1.0,
166            payload_mode: YamlWorkflowPayloadMode::FullPayload,
167            retention_days: 30,
168            multi_tenant: true,
169            tool_trace_mode: YamlToolTraceMode::Full,
170        }
171    }
172}
173
174#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
175pub struct YamlWorkflowTraceOptions {
176    #[serde(default)]
177    pub context: Option<YamlWorkflowTraceContextInput>,
178    #[serde(default)]
179    pub tenant: YamlWorkflowTraceTenantContext,
180}
181
182#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
183pub struct YamlWorkflowRunOptions {
184    #[serde(default)]
185    pub telemetry: YamlWorkflowTelemetryConfig,
186    #[serde(default)]
187    pub trace: YamlWorkflowTraceOptions,
188    #[serde(default)]
189    pub model: Option<String>,
190}
191
192#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
193pub struct YamlLlmTokenUsage {
194    pub prompt_tokens: u32,
195    pub completion_tokens: u32,
196    pub total_tokens: u32,
197    pub reasoning_tokens: Option<u32>,
198}
199
200#[derive(Debug, Clone, PartialEq, Serialize)]
201pub struct YamlLlmExecutionResult {
202    pub payload: Value,
203    pub usage: Option<YamlLlmTokenUsage>,
204    pub ttft_ms: Option<u128>,
205    pub tool_calls: Vec<YamlToolCallTrace>,
206}
207
208#[derive(Debug, Clone, PartialEq, Serialize)]
209pub struct YamlToolCallTrace {
210    pub id: String,
211    pub name: String,
212    pub arguments: Value,
213    pub output: Option<Value>,
214    pub status: String,
215    pub elapsed_ms: u128,
216    pub error: Option<String>,
217}
218
219#[derive(Debug, Clone, Default)]
220struct YamlTokenTotals {
221    input_tokens: u64,
222    output_tokens: u64,
223    total_tokens: u64,
224    reasoning_tokens: Option<u64>,
225}
226
227impl YamlTokenTotals {
228    fn add_usage(&mut self, usage: &YamlLlmTokenUsage) {
229        self.input_tokens += u64::from(usage.prompt_tokens);
230        self.output_tokens += u64::from(usage.completion_tokens);
231        self.total_tokens += u64::from(usage.total_tokens);
232
233        if let Some(reasoning_tokens) = usage.reasoning_tokens {
234            let next = self.reasoning_tokens.unwrap_or(0) + u64::from(reasoning_tokens);
235            self.reasoning_tokens = Some(next);
236        }
237    }
238
239    fn tokens_per_second(&self, elapsed_ms: u128) -> f64 {
240        if elapsed_ms == 0 {
241            return 0.0;
242        }
243        round_two_decimals((self.output_tokens as f64) * 1000.0 / (elapsed_ms as f64))
244    }
245}
246
247fn round_two_decimals(value: f64) -> f64 {
248    (value * 100.0).round() / 100.0
249}
250
251fn completion_tokens_per_second(completion_tokens: u32, elapsed_ms: u128) -> f64 {
252    if elapsed_ms == 0 {
253        return 0.0;
254    }
255    round_two_decimals((completion_tokens as f64) * 1000.0 / (elapsed_ms as f64))
256}
257
258fn resolve_requested_model(run_model_override: Option<&str>, node_model: &str) -> String {
259    run_model_override
260        .and_then(|model| {
261            let trimmed = model.trim();
262            if trimmed.is_empty() {
263                None
264            } else {
265                Some(trimmed.to_string())
266            }
267        })
268        .unwrap_or_else(|| node_model.to_string())
269}
270
271fn default_true() -> bool {
272    true
273}
274
275fn default_sample_rate() -> f32 {
276    1.0
277}
278
279fn default_retention_days() -> u32 {
280    30
281}
282
283fn validate_sample_rate(sample_rate: f32) -> Result<(), YamlWorkflowRunError> {
284    if sample_rate.is_finite() && (0.0..=1.0).contains(&sample_rate) {
285        return Ok(());
286    }
287
288    Err(YamlWorkflowRunError::InvalidInput {
289        message: format!(
290            "telemetry.sample_rate must be between 0.0 and 1.0 inclusive; received {sample_rate}"
291        ),
292    })
293}
294
295fn should_sample_trace(trace_id: &str, sample_rate: f32) -> bool {
296    if sample_rate >= 1.0 {
297        return true;
298    }
299    if sample_rate <= 0.0 {
300        return false;
301    }
302
303    let mut hash: u64 = 0xcbf29ce484222325;
304    for byte in trace_id.as_bytes() {
305        hash ^= u64::from(*byte);
306        hash = hash.wrapping_mul(0x100000001b3);
307    }
308
309    let ratio = (hash as f64) / (u64::MAX as f64);
310    ratio < (sample_rate as f64)
311}
312
313fn trace_id_from_traceparent(traceparent: &str) -> Option<String> {
314    let mut parts = traceparent.split('-');
315    let version = parts.next()?;
316    let trace_id = parts.next()?;
317    let _span_id = parts.next()?;
318    let _flags = parts.next()?;
319    if parts.next().is_some() {
320        return None;
321    }
322
323    if version.len() != 2 || trace_id.len() != 32 {
324        return None;
325    }
326
327    if !version.chars().all(|ch| ch.is_ascii_hexdigit()) {
328        return None;
329    }
330    if !trace_id.chars().all(|ch| ch.is_ascii_hexdigit()) {
331        return None;
332    }
333    if trace_id.chars().all(|ch| ch == '0') {
334        return None;
335    }
336
337    Some(trace_id.to_ascii_lowercase())
338}
339
340#[derive(Debug, Clone, Copy, PartialEq, Eq)]
341enum TraceIdSource {
342    Disabled,
343    ExplicitTraceId,
344    ParentTraceId,
345    Traceparent,
346    ParentTraceparent,
347    Generated,
348}
349
350#[derive(Debug, Clone)]
351struct ResolvedTelemetryContext {
352    trace_id: Option<String>,
353    sampled: bool,
354    trace_id_source: TraceIdSource,
355}
356
357fn resolve_run_trace_id_with_source(
358    options: &YamlWorkflowRunOptions,
359    parent_trace_context: Option<&TraceContext>,
360) -> (Option<String>, TraceIdSource) {
361    if !options.telemetry.enabled {
362        return (None, TraceIdSource::Disabled);
363    }
364
365    if let Some(trace_id) = options
366        .trace
367        .context
368        .as_ref()
369        .and_then(|context| context.trace_id.clone())
370    {
371        return (Some(trace_id), TraceIdSource::ExplicitTraceId);
372    }
373
374    if let Some(trace_id) = parent_trace_context.and_then(|context| context.trace_id.clone()) {
375        return (Some(trace_id), TraceIdSource::ParentTraceId);
376    }
377
378    if let Some(trace_id) = options
379        .trace
380        .context
381        .as_ref()
382        .and_then(|context| context.traceparent.as_deref())
383        .and_then(trace_id_from_traceparent)
384    {
385        return (Some(trace_id), TraceIdSource::Traceparent);
386    }
387
388    if let Some(trace_id) = parent_trace_context
389        .and_then(|context| context.traceparent.as_deref())
390        .and_then(trace_id_from_traceparent)
391    {
392        return (Some(trace_id), TraceIdSource::ParentTraceparent);
393    }
394
395    (Some(generate_trace_id()), TraceIdSource::Generated)
396}
397
398fn resolve_telemetry_context(
399    options: &YamlWorkflowRunOptions,
400    parent_trace_context: Option<&TraceContext>,
401) -> ResolvedTelemetryContext {
402    let (trace_id, trace_id_source) =
403        resolve_run_trace_id_with_source(options, parent_trace_context);
404    let sampled = trace_id
405        .as_deref()
406        .map(|value| should_sample_trace(value, options.telemetry.sample_rate))
407        .unwrap_or(false);
408
409    ResolvedTelemetryContext {
410        trace_id,
411        sampled,
412        trace_id_source,
413    }
414}
415
416fn generate_trace_id() -> String {
417    use std::time::{SystemTime, UNIX_EPOCH};
418
419    let now_nanos = SystemTime::now()
420        .duration_since(UNIX_EPOCH)
421        .map(|duration| duration.as_nanos())
422        .unwrap_or(0);
423    let sequence = u128::from(TRACE_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed));
424    format!("{:032x}", now_nanos ^ sequence)
425}
426
427fn workflow_metadata_with_trace(
428    options: &YamlWorkflowRunOptions,
429    trace_id: &str,
430    sampled: bool,
431    trace_id_source: TraceIdSource,
432) -> Value {
433    json!({
434        "telemetry": {
435            "trace_id": trace_id,
436            "trace_id_source": match trace_id_source {
437                TraceIdSource::Disabled => "disabled",
438                TraceIdSource::ExplicitTraceId => "explicit_trace_id",
439                TraceIdSource::ParentTraceId => "parent_trace_id",
440                TraceIdSource::Traceparent => "traceparent",
441                TraceIdSource::ParentTraceparent => "parent_traceparent",
442                TraceIdSource::Generated => "generated",
443            },
444            "enabled": options.telemetry.enabled,
445            "sampled": sampled,
446            "nerdstats": options.telemetry.nerdstats,
447            "sample_rate": options.telemetry.sample_rate,
448            "payload_mode": match options.telemetry.payload_mode {
449                YamlWorkflowPayloadMode::FullPayload => "full_payload",
450                YamlWorkflowPayloadMode::RedactedPayload => "redacted_payload",
451            },
452            "retention_days": options.telemetry.retention_days,
453            "multi_tenant": options.telemetry.multi_tenant,
454            "tool_trace_mode": match options.telemetry.tool_trace_mode {
455                YamlToolTraceMode::Full => "full",
456                YamlToolTraceMode::Redacted => "redacted",
457                YamlToolTraceMode::Off => "off",
458            },
459        },
460        "trace": {
461            "tenant": {
462                "workspace_id": options.trace.tenant.workspace_id,
463                "user_id": options.trace.tenant.user_id,
464                "conversation_id": options.trace.tenant.conversation_id,
465                "request_id": options.trace.tenant.request_id,
466                "run_id": options.trace.tenant.run_id,
467            }
468        },
469    })
470}
471
472fn apply_trace_tenant_attributes(span: &mut dyn WorkflowSpan, options: &YamlWorkflowRunOptions) {
473    if let Some(workspace_id) = options.trace.tenant.workspace_id.as_deref() {
474        span.set_attribute("tenant.workspace_id", workspace_id);
475    }
476    if let Some(user_id) = options.trace.tenant.user_id.as_deref() {
477        span.set_attribute("tenant.user_id", user_id);
478    }
479    if let Some(conversation_id) = options.trace.tenant.conversation_id.as_deref() {
480        span.set_attribute("tenant.conversation_id", conversation_id);
481    }
482    if let Some(request_id) = options.trace.tenant.request_id.as_deref() {
483        span.set_attribute("tenant.request_id", request_id);
484    }
485    if let Some(run_id) = options.trace.tenant.run_id.as_deref() {
486        span.set_attribute("tenant.run_id", run_id);
487    }
488}
489
490fn workflow_nerdstats(output: &YamlWorkflowRunOutput) -> Value {
491    let llm_nodes_without_usage: Vec<String> = output
492        .step_timings
493        .iter()
494        .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
495        .map(|step| step.node_id.clone())
496        .collect();
497    let token_metrics_available = llm_nodes_without_usage.is_empty();
498    let token_metrics_source = if token_metrics_available {
499        "provider_usage"
500    } else {
501        "provider_stream_usage_unavailable"
502    };
503    let total_input_tokens = if token_metrics_available {
504        json!(output.total_input_tokens)
505    } else {
506        Value::Null
507    };
508    let total_output_tokens = if token_metrics_available {
509        json!(output.total_output_tokens)
510    } else {
511        Value::Null
512    };
513    let total_tokens = if token_metrics_available {
514        json!(output.total_tokens)
515    } else {
516        Value::Null
517    };
518    let total_reasoning_tokens = if token_metrics_available {
519        json!(output.total_reasoning_tokens)
520    } else {
521        Value::Null
522    };
523    let tokens_per_second = if token_metrics_available {
524        json!(output.tokens_per_second)
525    } else {
526        Value::Null
527    };
528
529    json!({
530        "workflow_id": output.workflow_id,
531        "terminal_node": output.terminal_node,
532        "total_elapsed_ms": output.total_elapsed_ms,
533        "ttft_ms": output.ttft_ms,
534        "step_details": output.step_timings,
535        "total_input_tokens": total_input_tokens,
536        "total_output_tokens": total_output_tokens,
537        "total_tokens": total_tokens,
538        "total_reasoning_tokens": total_reasoning_tokens,
539        "tokens_per_second": tokens_per_second,
540        "trace_id": output.trace_id,
541        "token_metrics_available": token_metrics_available,
542        "token_metrics_source": token_metrics_source,
543        "llm_nodes_without_usage": llm_nodes_without_usage,
544    })
545}
546
547fn payload_for_span(mode: YamlWorkflowPayloadMode, payload: &Value) -> String {
548    match mode {
549        YamlWorkflowPayloadMode::FullPayload => payload.to_string(),
550        YamlWorkflowPayloadMode::RedactedPayload => json!({
551            "redacted": true,
552            "value_type": match payload {
553                Value::Null => "null",
554                Value::Bool(_) => "bool",
555                Value::Number(_) => "number",
556                Value::String(_) => "string",
557                Value::Array(_) => "array",
558                Value::Object(_) => "object",
559            }
560        })
561        .to_string(),
562    }
563}
564
565fn payload_for_tool_trace(mode: YamlToolTraceMode, payload: &Value) -> Value {
566    match mode {
567        YamlToolTraceMode::Full => payload.clone(),
568        YamlToolTraceMode::Redacted => json!({
569            "redacted": true,
570            "value_type": json_type_name(payload),
571        }),
572        YamlToolTraceMode::Off => Value::Null,
573    }
574}
575
576fn validate_json_schema(schema: &Value) -> Result<(), String> {
577    jsonschema::JSONSchema::compile(schema)
578        .map(|_| ())
579        .map_err(|error| format!("invalid JSON schema: {error}"))
580}
581
582fn validate_schema_instance(schema: &Value, instance: &Value) -> Result<(), String> {
583    let validator = jsonschema::JSONSchema::compile(schema)
584        .map_err(|error| format!("invalid JSON schema: {error}"))?;
585    if let Err(errors) = validator.validate(instance) {
586        let message = errors
587            .into_iter()
588            .next()
589            .map(|error| error.to_string())
590            .unwrap_or_else(|| "unknown schema validation error".to_string());
591        return Err(format!("schema validation failed: {message}"));
592    }
593    Ok(())
594}
595
596fn schema_type(schema: &Value) -> Option<&str> {
597    schema.get("type").and_then(Value::as_str)
598}
599
600fn schema_expects_object(schema: &Value) -> bool {
601    schema_type(schema) == Some("object")
602}
603
604fn trace_context_from_options(options: &YamlWorkflowRunOptions) -> Option<TraceContext> {
605    options.trace.context.as_ref().map(|input| TraceContext {
606        trace_id: input.trace_id.clone(),
607        span_id: input.span_id.clone(),
608        parent_span_id: input.parent_span_id.clone(),
609        traceparent: input.traceparent.clone(),
610        tracestate: input.tracestate.clone(),
611        baggage: input.baggage.clone(),
612    })
613}
614
615fn merged_trace_context_for_worker(
616    span_context: Option<&TraceContext>,
617    resolved_trace_id: Option<&str>,
618    options: &YamlWorkflowRunOptions,
619) -> TraceContext {
620    let input_context = options.trace.context.as_ref();
621    let baggage = if let Some(context) = span_context {
622        if !context.baggage.is_empty() {
623            context.baggage.clone()
624        } else {
625            input_context
626                .map(|value| value.baggage.clone())
627                .unwrap_or_default()
628        }
629    } else {
630        input_context
631            .map(|value| value.baggage.clone())
632            .unwrap_or_default()
633    };
634
635    TraceContext {
636        trace_id: span_context
637            .and_then(|context| context.trace_id.clone())
638            .or_else(|| resolved_trace_id.map(|value| value.to_string()))
639            .or_else(|| input_context.and_then(|context| context.trace_id.clone())),
640        span_id: span_context
641            .and_then(|context| context.span_id.clone())
642            .or_else(|| input_context.and_then(|context| context.span_id.clone())),
643        parent_span_id: span_context
644            .and_then(|context| context.parent_span_id.clone())
645            .or_else(|| input_context.and_then(|context| context.parent_span_id.clone())),
646        traceparent: span_context
647            .and_then(|context| context.traceparent.clone())
648            .or_else(|| input_context.and_then(|context| context.traceparent.clone())),
649        tracestate: span_context
650            .and_then(|context| context.tracestate.clone())
651            .or_else(|| input_context.and_then(|context| context.tracestate.clone())),
652        baggage,
653    }
654}
655
656fn custom_worker_context_with_trace(
657    context: &Value,
658    trace_context: &TraceContext,
659    tenant_context: &YamlWorkflowTraceTenantContext,
660) -> Value {
661    let mut context_with_trace = context.clone();
662    let Some(root) = context_with_trace.as_object_mut() else {
663        return context_with_trace;
664    };
665
666    root.insert(
667        "trace".to_string(),
668        json!({
669            "context": {
670                "trace_id": trace_context.trace_id,
671                "span_id": trace_context.span_id,
672                "parent_span_id": trace_context.parent_span_id,
673                "traceparent": trace_context.traceparent,
674                "tracestate": trace_context.tracestate,
675                "baggage": trace_context.baggage,
676            },
677            "tenant": {
678                "workspace_id": tenant_context.workspace_id,
679                "user_id": tenant_context.user_id,
680                "conversation_id": tenant_context.conversation_id,
681                "request_id": tenant_context.request_id,
682                "run_id": tenant_context.run_id,
683            }
684        }),
685    );
686
687    context_with_trace
688}
689
690fn include_raw_stream_debug_events() -> bool {
691    match std::env::var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW") {
692        Ok(value) => {
693            let normalized = value.trim().to_ascii_lowercase();
694            normalized == "1" || normalized == "true" || normalized == "yes" || normalized == "on"
695        }
696        Err(_) => false,
697    }
698}
699
700#[derive(Debug)]
701struct StreamedPayloadResolution {
702    payload: Value,
703    heal_confidence: Option<f32>,
704}
705
706#[derive(Debug, Default)]
707struct StreamJsonAsTextFormatter {
708    raw_json: String,
709    emitted: bool,
710}
711
712impl StreamJsonAsTextFormatter {
713    fn push(&mut self, chunk: &str) {
714        self.raw_json.push_str(chunk);
715    }
716
717    fn emit_if_ready(&mut self, complete: bool) -> Option<String> {
718        if self.emitted || !complete {
719            return None;
720        }
721        self.emitted = true;
722        Some(render_json_object_as_text(self.raw_json.as_str()))
723    }
724}
725
726fn render_json_object_as_text(raw_json: &str) -> String {
727    let value = match serde_json::from_str::<Value>(raw_json) {
728        Ok(value) => value,
729        Err(_) => return raw_json.to_string(),
730    };
731    let Some(object) = value.as_object() else {
732        return raw_json.to_string();
733    };
734
735    let mut lines = Vec::with_capacity(object.len());
736    for (key, value) in object {
737        let rendered = match value {
738            Value::String(text) => text.clone(),
739            _ => value.to_string(),
740        };
741        lines.push(format!("{key}: {rendered}"));
742    }
743    lines.join("\n")
744}
745
746#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
747#[serde(rename_all = "snake_case")]
748pub enum YamlWorkflowTokenKind {
749    Output,
750    Thinking,
751}
752
753#[derive(Debug, Default)]
754struct StructuredJsonDeltaFilter {
755    started: bool,
756    completed: bool,
757    depth: u32,
758    in_string: bool,
759    escape: bool,
760}
761
762impl StructuredJsonDeltaFilter {
763    fn split(&mut self, delta: &str) -> (Option<String>, Option<String>) {
764        if delta.is_empty() {
765            return (None, None);
766        }
767
768        let mut output = String::new();
769        let mut thinking = String::new();
770
771        for ch in delta.chars() {
772            if self.completed {
773                thinking.push(ch);
774                continue;
775            }
776
777            if !self.started {
778                if ch != '{' {
779                    thinking.push(ch);
780                    continue;
781                }
782                self.started = true;
783                self.depth = 1;
784                output.push(ch);
785                continue;
786            }
787
788            output.push(ch);
789            if self.in_string {
790                if self.escape {
791                    self.escape = false;
792                    continue;
793                }
794                if ch == '\\' {
795                    self.escape = true;
796                    continue;
797                }
798                if ch == '"' {
799                    self.in_string = false;
800                }
801                continue;
802            }
803
804            match ch {
805                '"' => self.in_string = true,
806                '{' => self.depth = self.depth.saturating_add(1),
807                '}' => {
808                    if self.depth > 0 {
809                        self.depth -= 1;
810                    }
811                    if self.depth == 0 {
812                        self.completed = true;
813                    }
814                }
815                _ => {}
816            }
817        }
818
819        let output = if output.is_empty() {
820            None
821        } else {
822            Some(output)
823        };
824        let thinking = if thinking.is_empty() {
825            None
826        } else {
827            Some(thinking)
828        };
829
830        (output, thinking)
831    }
832
833    fn completed(&self) -> bool {
834        self.completed
835    }
836}
837
838fn extract_last_fenced_json_block(raw: &str) -> Option<&str> {
839    let start = raw.rfind("```json")?;
840    let remainder = &raw[start + "```json".len()..];
841    let end = remainder.find("```")?;
842    let candidate = remainder[..end].trim();
843    if candidate.is_empty() {
844        return None;
845    }
846    Some(candidate)
847}
848
849fn extract_balanced_object_from(raw: &str, start_index: usize) -> Option<&str> {
850    let mut depth = 0u32;
851    let mut in_string = false;
852    let mut escape = false;
853
854    for (relative_index, ch) in raw[start_index..].char_indices() {
855        if in_string {
856            if escape {
857                escape = false;
858                continue;
859            }
860            if ch == '\\' {
861                escape = true;
862                continue;
863            }
864            if ch == '"' {
865                in_string = false;
866            }
867            continue;
868        }
869
870        match ch {
871            '"' => in_string = true,
872            '{' => depth = depth.saturating_add(1),
873            '}' => {
874                if depth == 0 {
875                    return None;
876                }
877                depth -= 1;
878                if depth == 0 {
879                    let end_index = start_index + relative_index + ch.len_utf8();
880                    return Some(raw[start_index..end_index].trim());
881                }
882            }
883            _ => {}
884        }
885    }
886
887    None
888}
889
890fn extract_last_parsable_object(raw: &str) -> Option<&str> {
891    let starts: Vec<usize> = raw
892        .char_indices()
893        .filter_map(|(index, ch)| if ch == '{' { Some(index) } else { None })
894        .collect();
895
896    for start in starts.into_iter().rev() {
897        let Some(candidate) = extract_balanced_object_from(raw, start) else {
898            continue;
899        };
900        if serde_json::from_str::<Value>(candidate).is_ok() {
901            return Some(candidate);
902        }
903    }
904
905    None
906}
907
908fn resolve_structured_json_candidate(raw: &str) -> Option<&str> {
909    extract_last_fenced_json_block(raw).or_else(|| extract_last_parsable_object(raw))
910}
911
912fn parse_streamed_structured_payload(
913    raw: &str,
914    heal: bool,
915) -> Result<StreamedPayloadResolution, String> {
916    if !heal {
917        if let Ok(payload) = serde_json::from_str::<Value>(raw) {
918            return Ok(StreamedPayloadResolution {
919                payload,
920                heal_confidence: None,
921            });
922        }
923
924        let candidate = resolve_structured_json_candidate(raw).ok_or_else(|| {
925            "failed to parse streamed structured completion JSON: no JSON object candidate found"
926                .to_string()
927        })?;
928        let payload = serde_json::from_str::<Value>(candidate).map_err(|error| {
929            format!(
930                "failed to parse streamed structured completion JSON: {error}; candidate={candidate}"
931            )
932        })?;
933        return Ok(StreamedPayloadResolution {
934            payload,
935            heal_confidence: None,
936        });
937    }
938
939    let candidate = resolve_structured_json_candidate(raw).unwrap_or(raw);
940    let parser = JsonishParser::new();
941    let healed = parser
942        .parse(candidate)
943        .map_err(|error| format!("failed to heal streamed structured completion JSON: {error}"))?;
944
945    Ok(StreamedPayloadResolution {
946        payload: healed.value,
947        heal_confidence: Some(healed.confidence),
948    })
949}
950
951#[derive(Debug, Clone, PartialEq, Serialize)]
952pub struct YamlWorkflowEvent {
953    pub event_type: String,
954    pub node_id: Option<String>,
955    pub step_id: Option<String>,
956    pub node_kind: Option<String>,
957    pub streamable: Option<bool>,
958    pub message: Option<String>,
959    pub delta: Option<String>,
960    pub token_kind: Option<YamlWorkflowTokenKind>,
961    pub is_terminal_node_token: Option<bool>,
962    pub elapsed_ms: Option<u128>,
963    pub metadata: Option<Value>,
964}
965
966pub type WorkflowMessageRole = Role;
967
968#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
969pub struct WorkflowMessage {
970    pub role: WorkflowMessageRole,
971    pub content: String,
972    #[serde(default)]
973    pub name: Option<String>,
974    #[serde(default, alias = "toolCallId")]
975    pub tool_call_id: Option<String>,
976}
977
978#[derive(Debug, Clone, PartialEq, Serialize)]
979pub struct YamlTemplateBinding {
980    pub index: usize,
981    pub expression: String,
982    pub source_path: String,
983    pub resolved: Value,
984    pub resolved_type: String,
985    pub missing: bool,
986}
987
988#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
989pub enum YamlWorkflowDiagnosticSeverity {
990    Error,
991    Warning,
992}
993
994#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
995pub struct YamlWorkflowDiagnostic {
996    pub node_id: Option<String>,
997    pub code: String,
998    pub severity: YamlWorkflowDiagnosticSeverity,
999    pub message: String,
1000}
1001
1002#[derive(Debug, Error)]
1003pub enum YamlWorkflowRunError {
1004    #[error("failed to read workflow yaml '{path}': {source}")]
1005    Read {
1006        path: String,
1007        source: std::io::Error,
1008    },
1009    #[error("failed to parse workflow yaml '{path}': {source}")]
1010    Parse {
1011        path: String,
1012        source: serde_yaml::Error,
1013    },
1014    #[error("workflow '{workflow_id}' has no nodes")]
1015    EmptyNodes { workflow_id: String },
1016    #[error("entry node '{entry_node}' does not exist")]
1017    MissingEntry { entry_node: String },
1018    #[error("unknown node id '{node_id}'")]
1019    MissingNode { node_id: String },
1020    #[error("unsupported node type in '{node_id}'")]
1021    UnsupportedNodeType { node_id: String },
1022    #[error("unsupported switch condition format: {condition}")]
1023    UnsupportedCondition { condition: String },
1024    #[error("switch node '{node_id}' has no valid next target")]
1025    InvalidSwitchTarget { node_id: String },
1026    #[error("llm returned non-object payload for node '{node_id}'")]
1027    LlmPayloadNotObject { node_id: String },
1028    #[error("custom worker handler '{handler}' is not supported")]
1029    UnsupportedCustomHandler { handler: String },
1030    #[error("llm execution failed for node '{node_id}': {message}")]
1031    Llm { node_id: String, message: String },
1032    #[error("custom worker execution failed for node '{node_id}': {message}")]
1033    CustomWorker { node_id: String, message: String },
1034    #[error("workflow validation failed with {diagnostics_count} error(s)")]
1035    Validation {
1036        diagnostics_count: usize,
1037        diagnostics: Vec<YamlWorkflowDiagnostic>,
1038    },
1039    #[error("invalid workflow input: {message}")]
1040    InvalidInput { message: String },
1041    #[error("ir runtime execution failed: {message}")]
1042    IrRuntime { message: String },
1043    #[error("workflow event stream cancelled: {message}")]
1044    EventSinkCancelled { message: String },
1045}
1046
1047pub trait YamlWorkflowEventSink: Send + Sync {
1048    fn emit(&self, event: &YamlWorkflowEvent);
1049
1050    fn is_cancelled(&self) -> bool {
1051        false
1052    }
1053}
1054
1055pub struct NoopYamlWorkflowEventSink;
1056
1057impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
1058    fn emit(&self, _event: &YamlWorkflowEvent) {}
1059}
1060
1061fn workflow_event_sink_cancelled_message() -> &'static str {
1062    "workflow event callback cancelled"
1063}
1064
1065fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
1066    event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
1067}
1068
1069#[derive(Debug, Clone, PartialEq, Eq, Error)]
1070pub enum YamlToIrError {
1071    #[error("entry node '{entry_node}' does not exist")]
1072    MissingEntry { entry_node: String },
1073    #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
1074    MultipleOutgoingEdge { node_id: String },
1075    #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
1076    UnsupportedNode { node_id: String, reason: String },
1077}
1078
1079/// Render a YAML workflow graph as Mermaid flowchart.
1080pub fn yaml_workflow_to_mermaid(workflow: &YamlWorkflow) -> String {
1081    if let Ok(ir) = yaml_workflow_to_ir(workflow) {
1082        return workflow_to_mermaid(&ir);
1083    }
1084
1085    yaml_workflow_to_mermaid_fallback(workflow)
1086}
1087
1088fn yaml_workflow_to_mermaid_fallback(workflow: &YamlWorkflow) -> String {
1089    let mut lines = Vec::new();
1090    lines.push("flowchart TD".to_string());
1091    let mut tool_node_ids: Vec<String> = Vec::new();
1092
1093    for node in &workflow.nodes {
1094        let label = format!("{}\\n({})", node.id, node.kind_name());
1095        lines.push(format!(
1096            "  {}[\"{}\"]",
1097            sanitize_mermaid_id(&node.id),
1098            escape_mermaid_label(label.as_str()),
1099        ));
1100
1101        if let Some(llm) = node.node_type.llm_call.as_ref() {
1102            for (idx, tool_name) in llm_tool_names(llm).into_iter().enumerate() {
1103                let tool_id = sanitize_mermaid_id(format!("{}__tool_{}", node.id, idx).as_str());
1104                lines.push(format!(
1105                    "  {}([\"{}\"])",
1106                    tool_id,
1107                    escape_mermaid_label(format!("tool: {tool_name}").as_str())
1108                ));
1109                lines.push(format!(
1110                    "  {} -.-> {}",
1111                    sanitize_mermaid_id(&node.id),
1112                    tool_id
1113                ));
1114                tool_node_ids.push(tool_id);
1115            }
1116        }
1117    }
1118
1119    let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1120
1121    for edge in &workflow.edges {
1122        emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1123    }
1124
1125    for node in &workflow.nodes {
1126        if let Some(switch) = node.node_type.switch.as_ref() {
1127            for branch in &switch.branches {
1128                emitted.insert((
1129                    node.id.clone(),
1130                    branch.condition.clone(),
1131                    branch.target.clone(),
1132                ));
1133            }
1134            emitted.insert((
1135                node.id.clone(),
1136                "default".to_string(),
1137                switch.default.clone(),
1138            ));
1139        }
1140    }
1141
1142    let mut edges = emitted.into_iter().collect::<Vec<_>>();
1143    edges.sort();
1144
1145    for (from, label, to) in edges {
1146        if label.is_empty() {
1147            lines.push(format!(
1148                "  {} --> {}",
1149                sanitize_mermaid_id(&from),
1150                sanitize_mermaid_id(&to)
1151            ));
1152        } else {
1153            lines.push(format!(
1154                "  {} -- \"{}\" --> {}",
1155                sanitize_mermaid_id(&from),
1156                escape_mermaid_label(&label),
1157                sanitize_mermaid_id(&to)
1158            ));
1159        }
1160    }
1161
1162    if !tool_node_ids.is_empty() {
1163        lines.push("  classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1164        lines.push(format!("  class {} toolNode;", tool_node_ids.join(",")));
1165    }
1166
1167    lines.join("\n")
1168}
1169
1170fn llm_tool_names(llm: &YamlLlmCall) -> Vec<String> {
1171    llm.tools
1172        .iter()
1173        .map(|tool| match tool {
1174            YamlToolDeclaration::OpenAi(openai) => openai.function.name.clone(),
1175            YamlToolDeclaration::Simplified(simple) => simple.name.clone(),
1176        })
1177        .collect()
1178}
1179
1180/// Load a YAML workflow file and render it as Mermaid flowchart.
1181pub fn yaml_workflow_file_to_mermaid(workflow_path: &Path) -> Result<String, YamlWorkflowRunError> {
1182    let contents =
1183        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1184            path: workflow_path.display().to_string(),
1185            source,
1186        })?;
1187
1188    let workflow: YamlWorkflow =
1189        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1190            path: workflow_path.display().to_string(),
1191            source,
1192        })?;
1193
1194    let referenced_subgraphs = discover_referenced_subgraphs(workflow_path, &workflow)?;
1195    if referenced_subgraphs.is_empty() {
1196        return Ok(yaml_workflow_to_mermaid(&workflow));
1197    }
1198
1199    Ok(yaml_workflow_to_mermaid_with_subgraphs(
1200        &workflow,
1201        &referenced_subgraphs,
1202    ))
1203}
1204
1205#[derive(Debug, Clone)]
1206struct MermaidSubgraphWorkflow {
1207    alias: String,
1208    workflow: YamlWorkflow,
1209}
1210
1211#[derive(Debug, Default)]
1212struct MermaidBlockRender {
1213    lines: Vec<String>,
1214    tool_node_ids: Vec<String>,
1215    run_workflow_tool_node_ids: Vec<String>,
1216    entry_node_id: String,
1217}
1218
1219fn yaml_workflow_to_mermaid_with_subgraphs(
1220    workflow: &YamlWorkflow,
1221    subgraphs: &[MermaidSubgraphWorkflow],
1222) -> String {
1223    let mut lines = vec!["flowchart TD".to_string()];
1224
1225    let main_block = render_mermaid_block(workflow, "main");
1226    lines.push(format!(
1227        "  subgraph main_graph[\"Main: {}\"]",
1228        escape_mermaid_label(&workflow.id)
1229    ));
1230    for line in &main_block.lines {
1231        lines.push(format!("    {line}"));
1232    }
1233    lines.push("  end".to_string());
1234
1235    let mut all_tool_nodes = main_block.tool_node_ids.clone();
1236
1237    for (index, subgraph) in subgraphs.iter().enumerate() {
1238        let block_id = format!("subgraph_{}", index + 1);
1239        let block = render_mermaid_block(&subgraph.workflow, &block_id);
1240        lines.push(format!(
1241            "  subgraph {}[\"Subgraph: {}\"]",
1242            sanitize_mermaid_id(&format!("{}_cluster", block_id)),
1243            escape_mermaid_label(&subgraph.alias)
1244        ));
1245        for line in &block.lines {
1246            lines.push(format!("    {line}"));
1247        }
1248        lines.push("  end".to_string());
1249
1250        for tool_node in &main_block.run_workflow_tool_node_ids {
1251            lines.push(format!(
1252                "  {} -. \"{}\" .-> {}",
1253                tool_node,
1254                escape_mermaid_label(&format!("calls {}", subgraph.alias)),
1255                block.entry_node_id
1256            ));
1257        }
1258
1259        all_tool_nodes.extend(block.tool_node_ids);
1260    }
1261
1262    if !all_tool_nodes.is_empty() {
1263        lines.push("  classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1264        lines.push(format!("  class {} toolNode;", all_tool_nodes.join(",")));
1265    }
1266
1267    lines.join("\n")
1268}
1269
1270fn render_mermaid_block(workflow: &YamlWorkflow, prefix: &str) -> MermaidBlockRender {
1271    let mut block = MermaidBlockRender {
1272        entry_node_id: prefixed_mermaid_id(prefix, &workflow.entry_node),
1273        ..Default::default()
1274    };
1275
1276    for node in &workflow.nodes {
1277        let node_id = prefixed_mermaid_id(prefix, &node.id);
1278        let label = format!("{}\\n({})", node.id, node.kind_name());
1279        block
1280            .lines
1281            .push(format!("{}[\"{}\"]", node_id, escape_mermaid_label(&label)));
1282
1283        if let Some(llm) = node.node_type.llm_call.as_ref() {
1284            for (idx, tool) in llm.tools.iter().enumerate() {
1285                let tool_name = tool_declaration_name(tool);
1286                let tool_id = prefixed_mermaid_id(prefix, &format!("{}__tool_{}", node.id, idx));
1287                block.lines.push(format!(
1288                    "{}([\"{}\"])",
1289                    tool_id,
1290                    escape_mermaid_label(format!("tool: {tool_name}").as_str())
1291                ));
1292                block.lines.push(format!("{} -.-> {}", node_id, tool_id));
1293                block.tool_node_ids.push(tool_id.clone());
1294
1295                if tool_name == "run_workflow_graph" {
1296                    block.run_workflow_tool_node_ids.push(tool_id);
1297                }
1298            }
1299        }
1300    }
1301
1302    let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1303
1304    for edge in &workflow.edges {
1305        emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1306    }
1307
1308    for node in &workflow.nodes {
1309        if let Some(switch) = node.node_type.switch.as_ref() {
1310            for branch in &switch.branches {
1311                emitted.insert((
1312                    node.id.clone(),
1313                    branch.condition.clone(),
1314                    branch.target.clone(),
1315                ));
1316            }
1317            emitted.insert((
1318                node.id.clone(),
1319                "default".to_string(),
1320                switch.default.clone(),
1321            ));
1322        }
1323    }
1324
1325    let mut edges = emitted.into_iter().collect::<Vec<_>>();
1326    edges.sort();
1327
1328    for (from, label, to) in edges {
1329        let from_id = prefixed_mermaid_id(prefix, &from);
1330        let to_id = prefixed_mermaid_id(prefix, &to);
1331        if label.is_empty() {
1332            block.lines.push(format!("{} --> {}", from_id, to_id));
1333        } else {
1334            block.lines.push(format!(
1335                "{} -- \"{}\" --> {}",
1336                from_id,
1337                escape_mermaid_label(&label),
1338                to_id
1339            ));
1340        }
1341    }
1342
1343    block
1344}
1345
1346fn discover_referenced_subgraphs(
1347    workflow_path: &Path,
1348    workflow: &YamlWorkflow,
1349) -> Result<Vec<MermaidSubgraphWorkflow>, YamlWorkflowRunError> {
1350    let workflow_ids = referenced_workflow_ids(workflow);
1351    if workflow_ids.is_empty() {
1352        return Ok(Vec::new());
1353    }
1354
1355    let parent_dir = workflow_path.parent().unwrap_or(Path::new("."));
1356    let sibling_workflows = load_yaml_sibling_workflows(parent_dir, workflow_path)?;
1357
1358    let mut discovered = Vec::new();
1359    let mut seen = HashSet::new();
1360
1361    for workflow_id in workflow_ids {
1362        let normalized = normalize_workflow_lookup_key(&workflow_id);
1363        if seen.contains(&normalized) {
1364            continue;
1365        }
1366
1367        if let Some((_, subworkflow)) = sibling_workflows.iter().find(|(key, _)| key == &normalized)
1368        {
1369            discovered.push(MermaidSubgraphWorkflow {
1370                alias: workflow_id.clone(),
1371                workflow: subworkflow.clone(),
1372            });
1373            seen.insert(normalized);
1374        }
1375    }
1376
1377    Ok(discovered)
1378}
1379
1380fn load_yaml_sibling_workflows(
1381    parent_dir: &Path,
1382    workflow_path: &Path,
1383) -> Result<Vec<(String, YamlWorkflow)>, YamlWorkflowRunError> {
1384    let mut results = Vec::new();
1385    let entries = std::fs::read_dir(parent_dir).map_err(|source| YamlWorkflowRunError::Read {
1386        path: parent_dir.display().to_string(),
1387        source,
1388    })?;
1389
1390    for entry in entries {
1391        let entry = entry.map_err(|source| YamlWorkflowRunError::Read {
1392            path: parent_dir.display().to_string(),
1393            source,
1394        })?;
1395        let path = entry.path();
1396        if !is_yaml_file(&path) {
1397            continue;
1398        }
1399
1400        if path == workflow_path {
1401            continue;
1402        }
1403
1404        let contents =
1405            std::fs::read_to_string(&path).map_err(|source| YamlWorkflowRunError::Read {
1406                path: path.display().to_string(),
1407                source,
1408            })?;
1409        let subworkflow: YamlWorkflow =
1410            serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1411                path: path.display().to_string(),
1412                source,
1413            })?;
1414
1415        if let Some(stem) = path.file_stem().and_then(|value| value.to_str()) {
1416            results.push((normalize_workflow_lookup_key(stem), subworkflow.clone()));
1417        }
1418        results.push((normalize_workflow_lookup_key(&subworkflow.id), subworkflow));
1419    }
1420
1421    Ok(results)
1422}
1423
1424fn referenced_workflow_ids(workflow: &YamlWorkflow) -> Vec<String> {
1425    let mut ids = Vec::new();
1426    let mut seen = HashSet::new();
1427
1428    for node in &workflow.nodes {
1429        let prompt = node
1430            .config
1431            .as_ref()
1432            .and_then(|config| config.prompt.as_deref());
1433
1434        if let Some(llm) = node.node_type.llm_call.as_ref() {
1435            for tool in &llm.tools {
1436                if tool_declaration_name(tool) != "run_workflow_graph" {
1437                    continue;
1438                }
1439
1440                for workflow_id in referenced_workflow_ids_from_tool(tool) {
1441                    let normalized = normalize_workflow_lookup_key(&workflow_id);
1442                    if seen.insert(normalized) {
1443                        ids.push(workflow_id);
1444                    }
1445                }
1446
1447                if let Some(prompt_text) = prompt {
1448                    for workflow_id in referenced_workflow_ids_from_prompt(prompt_text) {
1449                        let normalized = normalize_workflow_lookup_key(&workflow_id);
1450                        if seen.insert(normalized) {
1451                            ids.push(workflow_id);
1452                        }
1453                    }
1454                }
1455            }
1456        }
1457    }
1458
1459    ids
1460}
1461
1462fn referenced_workflow_ids_from_tool(tool: &YamlToolDeclaration) -> Vec<String> {
1463    let schema = match tool {
1464        YamlToolDeclaration::OpenAi(openai) => openai.function.parameters.as_ref(),
1465        YamlToolDeclaration::Simplified(simple) => Some(&simple.input_schema),
1466    };
1467
1468    let mut ids = Vec::new();
1469    if let Some(schema) = schema {
1470        if let Some(workflow_prop) = schema
1471            .get("properties")
1472            .and_then(Value::as_object)
1473            .and_then(|properties| properties.get("workflow_id"))
1474        {
1475            if let Some(value) = workflow_prop.get("const").and_then(Value::as_str) {
1476                ids.push(value.to_string());
1477            }
1478
1479            if let Some(enum_values) = workflow_prop.get("enum").and_then(Value::as_array) {
1480                for value in enum_values {
1481                    if let Some(value) = value.as_str() {
1482                        ids.push(value.to_string());
1483                    }
1484                }
1485            }
1486        }
1487    }
1488
1489    ids
1490}
1491
1492fn referenced_workflow_ids_from_prompt(prompt: &str) -> Vec<String> {
1493    let mut ids = Vec::new();
1494    let mut search = prompt;
1495
1496    while let Some(index) = search.find("\"workflow_id\"") {
1497        let remainder = &search[index + "\"workflow_id\"".len()..];
1498        let Some(colon_index) = remainder.find(':') else {
1499            break;
1500        };
1501
1502        let candidate = remainder[colon_index + 1..].trim_start();
1503        if let Some(rest) = candidate.strip_prefix('"') {
1504            if let Some(end_quote_index) = rest.find('"') {
1505                let workflow_id = rest[..end_quote_index].trim();
1506                if !workflow_id.is_empty() {
1507                    ids.push(workflow_id.to_string());
1508                }
1509                search = &rest[end_quote_index + 1..];
1510                continue;
1511            }
1512        }
1513
1514        search = &remainder[colon_index + 1..];
1515    }
1516
1517    ids
1518}
1519
1520fn normalize_workflow_lookup_key(value: &str) -> String {
1521    value
1522        .chars()
1523        .map(|ch| match ch {
1524            '-' => '_',
1525            _ => ch.to_ascii_lowercase(),
1526        })
1527        .collect()
1528}
1529
1530fn is_yaml_file(path: &Path) -> bool {
1531    matches!(
1532        path.extension().and_then(|ext| ext.to_str()),
1533        Some("yaml") | Some("yml")
1534    )
1535}
1536
1537fn prefixed_mermaid_id(prefix: &str, id: &str) -> String {
1538    sanitize_mermaid_id(format!("{}__{}", prefix, id).as_str())
1539}
1540
1541fn tool_declaration_name(tool: &YamlToolDeclaration) -> &str {
1542    match tool {
1543        YamlToolDeclaration::OpenAi(openai) => &openai.function.name,
1544        YamlToolDeclaration::Simplified(simple) => &simple.name,
1545    }
1546}
1547
1548pub fn yaml_workflow_to_ir(workflow: &YamlWorkflow) -> Result<WorkflowDefinition, YamlToIrError> {
1549    let known_ids: HashSet<&str> = workflow.nodes.iter().map(|n| n.id.as_str()).collect();
1550    if !known_ids.contains(workflow.entry_node.as_str()) {
1551        return Err(YamlToIrError::MissingEntry {
1552            entry_node: workflow.entry_node.clone(),
1553        });
1554    }
1555
1556    let mut outgoing: HashMap<&str, Vec<&str>> = HashMap::new();
1557    for edge in &workflow.edges {
1558        outgoing
1559            .entry(edge.from.as_str())
1560            .or_default()
1561            .push(edge.to.as_str());
1562    }
1563
1564    let mut nodes = Vec::with_capacity(workflow.nodes.len() + 1);
1565    nodes.push(Node {
1566        id: YAML_START_NODE_ID.to_string(),
1567        kind: NodeKind::Start {
1568            next: workflow.entry_node.clone(),
1569        },
1570    });
1571
1572    for node in &workflow.nodes {
1573        if let Some(llm) = node.node_type.llm_call.as_ref() {
1574            if node
1575                .config
1576                .as_ref()
1577                .and_then(|c| c.set_globals.as_ref())
1578                .is_some()
1579                || node
1580                    .config
1581                    .as_ref()
1582                    .and_then(|c| c.update_globals.as_ref())
1583                    .is_some()
1584            {
1585                return Err(YamlToIrError::UnsupportedNode {
1586                    node_id: node.id.clone(),
1587                    reason: "set_globals/update_globals are not represented in canonical IR llm nodes yet"
1588                        .to_string(),
1589                });
1590            }
1591
1592            if !llm.tools.is_empty() {
1593                return Err(YamlToIrError::UnsupportedNode {
1594                    node_id: node.id.clone(),
1595                    reason: "llm_call.tools are not represented in canonical IR llm nodes yet"
1596                        .to_string(),
1597                });
1598            }
1599
1600            let next = single_next_for_node(&outgoing, &node.id)?;
1601            nodes.push(Node {
1602                id: node.id.clone(),
1603                kind: NodeKind::Tool {
1604                    tool: YAML_LLM_TOOL_ID.to_string(),
1605                    input: json!({
1606                        "node_id": node.id,
1607                        "model": llm.model,
1608                        "prompt_template": node
1609                            .config
1610                            .as_ref()
1611                            .and_then(|c| c.prompt.clone())
1612                            .unwrap_or_default(),
1613                        "stream": llm.stream.unwrap_or(false),
1614                        "stream_json_as_text": llm.stream_json_as_text.unwrap_or(false),
1615                        "heal": llm.heal.unwrap_or(false),
1616                        "messages_path": llm.messages_path,
1617                        "append_prompt_as_user": llm.append_prompt_as_user.unwrap_or(true),
1618                        "output_schema": node
1619                            .config
1620                            .as_ref()
1621                            .and_then(|c| c.output_schema.clone())
1622                            .unwrap_or_else(default_llm_output_schema),
1623                    }),
1624                    next,
1625                },
1626            });
1627            continue;
1628        }
1629
1630        if let Some(worker) = node.node_type.custom_worker.as_ref() {
1631            if node
1632                .config
1633                .as_ref()
1634                .and_then(|c| c.set_globals.as_ref())
1635                .is_some()
1636                || node
1637                    .config
1638                    .as_ref()
1639                    .and_then(|c| c.update_globals.as_ref())
1640                    .is_some()
1641            {
1642                return Err(YamlToIrError::UnsupportedNode {
1643                    node_id: node.id.clone(),
1644                    reason: "set_globals/update_globals are not represented in canonical IR tool nodes yet"
1645                        .to_string(),
1646                });
1647            }
1648
1649            let next = single_next_for_node(&outgoing, &node.id)?;
1650            nodes.push(Node {
1651                id: node.id.clone(),
1652                kind: NodeKind::Tool {
1653                    tool: worker.handler.clone(),
1654                    input: node
1655                        .config
1656                        .as_ref()
1657                        .and_then(|c| c.payload.clone())
1658                        .unwrap_or_else(|| json!({})),
1659                    next,
1660                },
1661            });
1662            continue;
1663        }
1664
1665        if let Some(switch) = node.node_type.switch.as_ref() {
1666            nodes.push(Node {
1667                id: node.id.clone(),
1668                kind: NodeKind::Router {
1669                    routes: switch
1670                        .branches
1671                        .iter()
1672                        .map(|b| RouterRoute {
1673                            when: rewrite_yaml_condition_to_ir(&b.condition),
1674                            next: b.target.clone(),
1675                        })
1676                        .collect(),
1677                    default: switch.default.clone(),
1678                },
1679            });
1680            continue;
1681        }
1682
1683        return Err(YamlToIrError::UnsupportedNode {
1684            node_id: node.id.clone(),
1685            reason: "node_type must be llm_call, switch, or custom_worker".to_string(),
1686        });
1687    }
1688
1689    Ok(WorkflowDefinition {
1690        version: WORKFLOW_IR_V0.to_string(),
1691        name: workflow.id.clone(),
1692        nodes,
1693    })
1694}
1695
1696fn single_next_for_node(
1697    outgoing: &HashMap<&str, Vec<&str>>,
1698    node_id: &str,
1699) -> Result<Option<String>, YamlToIrError> {
1700    match outgoing.get(node_id) {
1701        None => Ok(None),
1702        Some(targets) if targets.len() == 1 => Ok(Some(targets[0].to_string())),
1703        Some(_) => Err(YamlToIrError::MultipleOutgoingEdge {
1704            node_id: node_id.to_string(),
1705        }),
1706    }
1707}
1708
1709fn rewrite_yaml_condition_to_ir(expr: &str) -> String {
1710    let rewritten = expr
1711        .replace("$.nodes.", "$.node_outputs.")
1712        .replace(".output.", ".");
1713    if let Some(prefix) = rewritten.strip_suffix(".output") {
1714        prefix.to_string()
1715    } else {
1716        rewritten
1717    }
1718}
1719
1720fn sanitize_mermaid_id(id: &str) -> String {
1721    let mut out = String::with_capacity(id.len() + 1);
1722    if id
1723        .chars()
1724        .next()
1725        .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_')
1726    {
1727        out.push_str(id);
1728    } else {
1729        out.push('n');
1730        out.push('_');
1731        out.push_str(id);
1732    }
1733    out.chars()
1734        .map(|ch| {
1735            if ch.is_ascii_alphanumeric() || ch == '_' {
1736                ch
1737            } else {
1738                '_'
1739            }
1740        })
1741        .collect()
1742}
1743
1744fn escape_mermaid_label(label: &str) -> String {
1745    label.replace('"', "\\\"")
1746}
1747
1748#[derive(Debug, Clone)]
1749pub struct YamlLlmExecutionRequest {
1750    pub node_id: String,
1751    pub is_terminal_node: bool,
1752    pub stream_json_as_text: bool,
1753    pub model: String,
1754    pub messages: Option<Vec<Message>>,
1755    pub append_prompt_as_user: bool,
1756    pub prompt: String,
1757    pub prompt_template: String,
1758    pub prompt_bindings: Vec<YamlTemplateBinding>,
1759    pub schema: Value,
1760    pub stream: bool,
1761    pub heal: bool,
1762    pub tools: Vec<YamlResolvedTool>,
1763    pub tool_choice: Option<ToolChoice>,
1764    pub max_tool_roundtrips: u8,
1765    pub tool_calls_global_key: Option<String>,
1766    pub tool_trace_mode: YamlToolTraceMode,
1767    pub execution_context: Value,
1768    pub email_text: String,
1769    pub trace_id: Option<String>,
1770    pub trace_context: Option<TraceContext>,
1771    pub trace_sampled: bool,
1772}
1773
1774#[derive(Debug, Clone)]
1775pub struct YamlResolvedTool {
1776    pub definition: ToolDefinition,
1777    pub output_schema: Option<Value>,
1778}
1779
1780#[async_trait]
1781pub trait YamlWorkflowLlmExecutor: Send + Sync {
1782    async fn complete_structured(
1783        &self,
1784        request: YamlLlmExecutionRequest,
1785        event_sink: Option<&dyn YamlWorkflowEventSink>,
1786    ) -> Result<YamlLlmExecutionResult, String>;
1787}
1788
1789#[async_trait]
1790pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
1791    async fn execute(
1792        &self,
1793        handler: &str,
1794        payload: &Value,
1795        email_text: &str,
1796        context: &Value,
1797    ) -> Result<Value, String>;
1798}
1799
1800pub async fn run_workflow_yaml_file(
1801    workflow_path: &Path,
1802    workflow_input: &Value,
1803    executor: &dyn YamlWorkflowLlmExecutor,
1804) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1805    let contents =
1806        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1807            path: workflow_path.display().to_string(),
1808            source,
1809        })?;
1810
1811    let workflow: YamlWorkflow =
1812        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1813            path: workflow_path.display().to_string(),
1814            source,
1815        })?;
1816
1817    run_workflow_yaml(&workflow, workflow_input, executor).await
1818}
1819
1820pub async fn run_email_workflow_yaml_file(
1821    workflow_path: &Path,
1822    email_text: &str,
1823    executor: &dyn YamlWorkflowLlmExecutor,
1824) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1825    let workflow_input = json!({ "email_text": email_text });
1826    run_workflow_yaml_file(workflow_path, &workflow_input, executor).await
1827}
1828
1829pub async fn run_workflow_yaml_file_with_client(
1830    workflow_path: &Path,
1831    workflow_input: &Value,
1832    client: &SimpleAgentsClient,
1833) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1834    let contents =
1835        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1836            path: workflow_path.display().to_string(),
1837            source,
1838        })?;
1839
1840    let workflow: YamlWorkflow =
1841        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1842            path: workflow_path.display().to_string(),
1843            source,
1844        })?;
1845
1846    run_workflow_yaml_with_client(&workflow, workflow_input, client).await
1847}
1848
1849pub async fn run_email_workflow_yaml_file_with_client(
1850    workflow_path: &Path,
1851    email_text: &str,
1852    client: &SimpleAgentsClient,
1853) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1854    let workflow_input = json!({ "email_text": email_text });
1855    run_workflow_yaml_file_with_client(workflow_path, &workflow_input, client).await
1856}
1857
1858pub async fn run_workflow_yaml_with_client(
1859    workflow: &YamlWorkflow,
1860    workflow_input: &Value,
1861    client: &SimpleAgentsClient,
1862) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1863    run_workflow_yaml_with_client_and_custom_worker(workflow, workflow_input, client, None).await
1864}
1865
1866pub async fn run_email_workflow_yaml_with_client(
1867    workflow: &YamlWorkflow,
1868    email_text: &str,
1869    client: &SimpleAgentsClient,
1870) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1871    let workflow_input = json!({ "email_text": email_text });
1872    run_workflow_yaml_with_client(workflow, &workflow_input, client).await
1873}
1874
1875pub async fn run_workflow_yaml_file_with_client_and_custom_worker(
1876    workflow_path: &Path,
1877    workflow_input: &Value,
1878    client: &SimpleAgentsClient,
1879    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1880) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1881    let contents =
1882        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1883            path: workflow_path.display().to_string(),
1884            source,
1885        })?;
1886
1887    let workflow: YamlWorkflow =
1888        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1889            path: workflow_path.display().to_string(),
1890            source,
1891        })?;
1892
1893    run_workflow_yaml_with_client_and_custom_worker(
1894        &workflow,
1895        workflow_input,
1896        client,
1897        custom_worker,
1898    )
1899    .await
1900}
1901
1902pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker(
1903    workflow_path: &Path,
1904    email_text: &str,
1905    client: &SimpleAgentsClient,
1906    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1907) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1908    let workflow_input = json!({ "email_text": email_text });
1909    run_workflow_yaml_file_with_client_and_custom_worker(
1910        workflow_path,
1911        &workflow_input,
1912        client,
1913        custom_worker,
1914    )
1915    .await
1916}
1917
1918pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events(
1919    workflow_path: &Path,
1920    workflow_input: &Value,
1921    client: &SimpleAgentsClient,
1922    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1923    event_sink: Option<&dyn YamlWorkflowEventSink>,
1924) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1925    run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
1926        workflow_path,
1927        workflow_input,
1928        client,
1929        custom_worker,
1930        event_sink,
1931        &YamlWorkflowRunOptions::default(),
1932    )
1933    .await
1934}
1935
1936pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
1937    workflow_path: &Path,
1938    workflow_input: &Value,
1939    client: &SimpleAgentsClient,
1940    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1941    event_sink: Option<&dyn YamlWorkflowEventSink>,
1942    options: &YamlWorkflowRunOptions,
1943) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1944    let contents =
1945        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1946            path: workflow_path.display().to_string(),
1947            source,
1948        })?;
1949
1950    let workflow: YamlWorkflow =
1951        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1952            path: workflow_path.display().to_string(),
1953            source,
1954        })?;
1955
1956    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1957        &workflow,
1958        workflow_input,
1959        client,
1960        custom_worker,
1961        event_sink,
1962        options,
1963    )
1964    .await
1965}
1966
1967pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker_and_events(
1968    workflow_path: &Path,
1969    email_text: &str,
1970    client: &SimpleAgentsClient,
1971    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1972    event_sink: Option<&dyn YamlWorkflowEventSink>,
1973) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1974    let workflow_input = json!({ "email_text": email_text });
1975    run_workflow_yaml_file_with_client_and_custom_worker_and_events(
1976        workflow_path,
1977        &workflow_input,
1978        client,
1979        custom_worker,
1980        event_sink,
1981    )
1982    .await
1983}
1984
1985pub async fn run_workflow_yaml_with_client_and_custom_worker(
1986    workflow: &YamlWorkflow,
1987    workflow_input: &Value,
1988    client: &SimpleAgentsClient,
1989    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1990) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1991    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1992        workflow,
1993        workflow_input,
1994        client,
1995        custom_worker,
1996        None,
1997        &YamlWorkflowRunOptions::default(),
1998    )
1999    .await
2000}
2001
2002pub async fn run_email_workflow_yaml_with_client_and_custom_worker(
2003    workflow: &YamlWorkflow,
2004    email_text: &str,
2005    client: &SimpleAgentsClient,
2006    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2007) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2008    let workflow_input = json!({ "email_text": email_text });
2009    run_workflow_yaml_with_client_and_custom_worker(
2010        workflow,
2011        &workflow_input,
2012        client,
2013        custom_worker,
2014    )
2015    .await
2016}
2017
2018pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events(
2019    workflow: &YamlWorkflow,
2020    workflow_input: &Value,
2021    client: &SimpleAgentsClient,
2022    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2023    event_sink: Option<&dyn YamlWorkflowEventSink>,
2024) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2025    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2026        workflow,
2027        workflow_input,
2028        client,
2029        custom_worker,
2030        event_sink,
2031        &YamlWorkflowRunOptions::default(),
2032    )
2033    .await
2034}
2035
2036pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2037    workflow: &YamlWorkflow,
2038    workflow_input: &Value,
2039    client: &SimpleAgentsClient,
2040    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2041    event_sink: Option<&dyn YamlWorkflowEventSink>,
2042    options: &YamlWorkflowRunOptions,
2043) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2044    struct BorrowedClientExecutor<'a> {
2045        client: &'a SimpleAgentsClient,
2046        custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
2047        run_options: YamlWorkflowRunOptions,
2048    }
2049
2050    #[async_trait]
2051    impl<'a> YamlWorkflowLlmExecutor for BorrowedClientExecutor<'a> {
2052        async fn complete_structured(
2053            &self,
2054            request: YamlLlmExecutionRequest,
2055            event_sink: Option<&dyn YamlWorkflowEventSink>,
2056        ) -> Result<YamlLlmExecutionResult, String> {
2057            let expects_object = schema_expects_object(&request.schema);
2058            let messages = if let Some(mut history) = request.messages.clone() {
2059                if request.append_prompt_as_user && !request.prompt.trim().is_empty() {
2060                    history.push(Message::user(&request.prompt));
2061                }
2062                history
2063            } else {
2064                vec![
2065                    Message::system("You execute workflow classification steps."),
2066                    Message::user(&request.prompt),
2067                ]
2068            };
2069
2070            if !request.tools.is_empty() {
2071                let mut tool_traces: Vec<YamlToolCallTrace> = Vec::new();
2072                let mut conversation = messages;
2073                let mut usage_total: Option<YamlLlmTokenUsage> = None;
2074
2075                for roundtrip in 0..=request.max_tool_roundtrips {
2076                    let mut builder = CompletionRequest::builder()
2077                        .model(&request.model)
2078                        .messages(conversation.clone())
2079                        .tools(request.tools.iter().map(|t| t.definition.clone()).collect());
2080
2081                    if request.heal && expects_object {
2082                        builder = builder.json_schema("workflow_step", request.schema.clone());
2083                    }
2084
2085                    if let Some(choice) = request.tool_choice.clone() {
2086                        builder = builder.tool_choice(choice);
2087                    }
2088
2089                    if request.stream {
2090                        builder = builder.stream(true);
2091                    }
2092
2093                    let completion_request = builder
2094                        .build()
2095                        .map_err(|error| format!("failed to build completion request: {error}"))?;
2096
2097                    let outcome = self
2098                        .client
2099                        .complete(&completion_request, CompletionOptions::default())
2100                        .await
2101                        .map_err(|error| error.to_string())?;
2102
2103                    let mut streamed_tool_calls: Option<Vec<ToolCall>> = None;
2104                    let mut streamed_content = String::new();
2105                    let mut finish_reason = FinishReason::Stop;
2106
2107                    match outcome {
2108                        CompletionOutcome::Response(response) => {
2109                            if let Some(usage) = usage_total.as_mut() {
2110                                usage.prompt_tokens += response.usage.prompt_tokens;
2111                                usage.completion_tokens += response.usage.completion_tokens;
2112                                usage.total_tokens += response.usage.total_tokens;
2113                                if let Some(reasoning_tokens) = response.usage.reasoning_tokens {
2114                                    usage.reasoning_tokens = Some(
2115                                        usage.reasoning_tokens.unwrap_or(0) + reasoning_tokens,
2116                                    );
2117                                }
2118                            } else {
2119                                usage_total = Some(YamlLlmTokenUsage {
2120                                    prompt_tokens: response.usage.prompt_tokens,
2121                                    completion_tokens: response.usage.completion_tokens,
2122                                    total_tokens: response.usage.total_tokens,
2123                                    reasoning_tokens: response.usage.reasoning_tokens,
2124                                });
2125                            }
2126
2127                            let choice = response
2128                                .choices
2129                                .first()
2130                                .ok_or_else(|| "completion returned no choices".to_string())?;
2131                            streamed_content = choice.message.content.clone();
2132                            streamed_tool_calls = choice.message.tool_calls.clone();
2133                            finish_reason = choice.finish_reason;
2134                        }
2135                        CompletionOutcome::HealedJson(healed) => {
2136                            let response = healed.response;
2137                            if let Some(usage) = usage_total.as_mut() {
2138                                usage.prompt_tokens += response.usage.prompt_tokens;
2139                                usage.completion_tokens += response.usage.completion_tokens;
2140                                usage.total_tokens += response.usage.total_tokens;
2141                                if let Some(reasoning_tokens) = response.usage.reasoning_tokens {
2142                                    usage.reasoning_tokens = Some(
2143                                        usage.reasoning_tokens.unwrap_or(0) + reasoning_tokens,
2144                                    );
2145                                }
2146                            } else {
2147                                usage_total = Some(YamlLlmTokenUsage {
2148                                    prompt_tokens: response.usage.prompt_tokens,
2149                                    completion_tokens: response.usage.completion_tokens,
2150                                    total_tokens: response.usage.total_tokens,
2151                                    reasoning_tokens: response.usage.reasoning_tokens,
2152                                });
2153                            }
2154
2155                            let choice = response
2156                                .choices
2157                                .first()
2158                                .ok_or_else(|| "completion returned no choices".to_string())?;
2159                            streamed_content = choice.message.content.clone();
2160                            streamed_tool_calls = choice.message.tool_calls.clone();
2161                            finish_reason = choice.finish_reason;
2162                        }
2163                        CompletionOutcome::CoercedSchema(coerced) => {
2164                            let response = coerced.response;
2165                            if let Some(usage) = usage_total.as_mut() {
2166                                usage.prompt_tokens += response.usage.prompt_tokens;
2167                                usage.completion_tokens += response.usage.completion_tokens;
2168                                usage.total_tokens += response.usage.total_tokens;
2169                                if let Some(reasoning_tokens) = response.usage.reasoning_tokens {
2170                                    usage.reasoning_tokens = Some(
2171                                        usage.reasoning_tokens.unwrap_or(0) + reasoning_tokens,
2172                                    );
2173                                }
2174                            } else {
2175                                usage_total = Some(YamlLlmTokenUsage {
2176                                    prompt_tokens: response.usage.prompt_tokens,
2177                                    completion_tokens: response.usage.completion_tokens,
2178                                    total_tokens: response.usage.total_tokens,
2179                                    reasoning_tokens: response.usage.reasoning_tokens,
2180                                });
2181                            }
2182
2183                            let choice = response
2184                                .choices
2185                                .first()
2186                                .ok_or_else(|| "completion returned no choices".to_string())?;
2187                            streamed_content = choice.message.content.clone();
2188                            streamed_tool_calls = choice.message.tool_calls.clone();
2189                            finish_reason = choice.finish_reason;
2190                        }
2191                        CompletionOutcome::Stream(mut stream) => {
2192                            let mut final_stream_usage: Option<simple_agent_type::response::Usage> =
2193                                None;
2194                            let mut delta_filter = StructuredJsonDeltaFilter::default();
2195                            let include_raw_debug = include_raw_stream_debug_events();
2196                            let mut json_text_formatter = if request.stream_json_as_text {
2197                                Some(StreamJsonAsTextFormatter::default())
2198                            } else {
2199                                None
2200                            };
2201                            let mut tool_calls_by_index: HashMap<u32, ToolCall> = HashMap::new();
2202
2203                            while let Some(chunk_result) = stream.next().await {
2204                                if event_sink_is_cancelled(event_sink) {
2205                                    return Err(workflow_event_sink_cancelled_message().to_string());
2206                                }
2207
2208                                let chunk = chunk_result.map_err(|error| error.to_string())?;
2209                                if let Some(usage) = chunk.usage {
2210                                    final_stream_usage = Some(usage);
2211                                }
2212
2213                                if let Some(choice) = chunk.choices.first() {
2214                                    if let Some(chunk_finish_reason) = choice.finish_reason {
2215                                        finish_reason = chunk_finish_reason;
2216                                    }
2217
2218                                    if include_raw_debug {
2219                                        if let Some(reasoning_delta) =
2220                                            choice.delta.reasoning_content.as_ref()
2221                                        {
2222                                            if let Some(sink) = event_sink {
2223                                                sink.emit(&YamlWorkflowEvent {
2224                                                    event_type: "node_stream_thinking_delta"
2225                                                        .to_string(),
2226                                                    node_id: Some(request.node_id.clone()),
2227                                                    step_id: Some(request.node_id.clone()),
2228                                                    node_kind: Some("llm_call".to_string()),
2229                                                    streamable: Some(true),
2230                                                    message: None,
2231                                                    delta: Some(reasoning_delta.clone()),
2232                                                    token_kind: Some(
2233                                                        YamlWorkflowTokenKind::Thinking,
2234                                                    ),
2235                                                    is_terminal_node_token: Some(
2236                                                        request.is_terminal_node,
2237                                                    ),
2238                                                    elapsed_ms: None,
2239                                                    metadata: None,
2240                                                });
2241                                            }
2242                                        }
2243                                    }
2244
2245                                    if let Some(delta) = choice.delta.content.clone() {
2246                                        streamed_content.push_str(delta.as_str());
2247                                        let (output_delta, thinking_delta) = if expects_object {
2248                                            delta_filter.split(delta.as_str())
2249                                        } else {
2250                                            (Some(delta.clone()), None)
2251                                        };
2252                                        let rendered_output_delta = if let Some(output_chunk) =
2253                                            output_delta
2254                                        {
2255                                            if let Some(formatter) = json_text_formatter.as_mut() {
2256                                                formatter.push(output_chunk.as_str());
2257                                                formatter.emit_if_ready(delta_filter.completed())
2258                                            } else {
2259                                                Some(output_chunk)
2260                                            }
2261                                        } else {
2262                                            None
2263                                        };
2264
2265                                        if include_raw_debug {
2266                                            if let Some(sink) = event_sink {
2267                                                if let Some(raw_thinking_delta) =
2268                                                    thinking_delta.as_ref()
2269                                                {
2270                                                    sink.emit(&YamlWorkflowEvent {
2271                                                        event_type: "node_stream_thinking_delta"
2272                                                            .to_string(),
2273                                                        node_id: Some(request.node_id.clone()),
2274                                                        step_id: Some(request.node_id.clone()),
2275                                                        node_kind: Some("llm_call".to_string()),
2276                                                        streamable: Some(true),
2277                                                        message: None,
2278                                                        delta: Some(raw_thinking_delta.clone()),
2279                                                        token_kind: Some(
2280                                                            YamlWorkflowTokenKind::Thinking,
2281                                                        ),
2282                                                        is_terminal_node_token: Some(
2283                                                            request.is_terminal_node,
2284                                                        ),
2285                                                        elapsed_ms: None,
2286                                                        metadata: None,
2287                                                    });
2288                                                }
2289                                                if let Some(raw_output_delta) =
2290                                                    rendered_output_delta.as_ref()
2291                                                {
2292                                                    sink.emit(&YamlWorkflowEvent {
2293                                                        event_type: "node_stream_output_delta"
2294                                                            .to_string(),
2295                                                        node_id: Some(request.node_id.clone()),
2296                                                        step_id: Some(request.node_id.clone()),
2297                                                        node_kind: Some("llm_call".to_string()),
2298                                                        streamable: Some(true),
2299                                                        message: None,
2300                                                        delta: Some(raw_output_delta.clone()),
2301                                                        token_kind: Some(
2302                                                            YamlWorkflowTokenKind::Output,
2303                                                        ),
2304                                                        is_terminal_node_token: Some(
2305                                                            request.is_terminal_node,
2306                                                        ),
2307                                                        elapsed_ms: None,
2308                                                        metadata: None,
2309                                                    });
2310                                                }
2311                                            }
2312                                        }
2313
2314                                        if let Some(filtered_delta) = rendered_output_delta {
2315                                            if let Some(sink) = event_sink {
2316                                                sink.emit(&YamlWorkflowEvent {
2317                                                    event_type: "node_stream_delta".to_string(),
2318                                                    node_id: Some(request.node_id.clone()),
2319                                                    step_id: Some(request.node_id.clone()),
2320                                                    node_kind: Some("llm_call".to_string()),
2321                                                    streamable: Some(true),
2322                                                    message: None,
2323                                                    delta: Some(filtered_delta),
2324                                                    token_kind: Some(YamlWorkflowTokenKind::Output),
2325                                                    is_terminal_node_token: Some(
2326                                                        request.is_terminal_node,
2327                                                    ),
2328                                                    elapsed_ms: None,
2329                                                    metadata: None,
2330                                                });
2331                                            }
2332                                        }
2333                                    }
2334
2335                                    if let Some(tool_call_deltas) = choice.delta.tool_calls.as_ref()
2336                                    {
2337                                        for tool_call_delta in tool_call_deltas {
2338                                            let entry = tool_calls_by_index
2339                                                .entry(tool_call_delta.index)
2340                                                .or_insert_with(|| ToolCall {
2341                                                    id: tool_call_delta.id.clone().unwrap_or_else(
2342                                                        || {
2343                                                            format!(
2344                                                                "tool_call_{}",
2345                                                                tool_call_delta.index
2346                                                            )
2347                                                        },
2348                                                    ),
2349                                                    tool_type: ToolType::Function,
2350                                                    function:
2351                                                        simple_agent_type::tool::ToolCallFunction {
2352                                                            name: String::new(),
2353                                                            arguments: String::new(),
2354                                                        },
2355                                                });
2356
2357                                            if let Some(id) = tool_call_delta.id.as_ref() {
2358                                                entry.id = id.clone();
2359                                            }
2360                                            if let Some(tool_type) = tool_call_delta.tool_type {
2361                                                entry.tool_type = tool_type;
2362                                            }
2363                                            if let Some(function_delta) =
2364                                                tool_call_delta.function.as_ref()
2365                                            {
2366                                                if let Some(name) = function_delta.name.as_ref() {
2367                                                    entry.function.name = name.clone();
2368                                                }
2369                                                if let Some(arguments) =
2370                                                    function_delta.arguments.as_ref()
2371                                                {
2372                                                    entry.function.arguments.push_str(arguments);
2373                                                }
2374                                            }
2375                                        }
2376                                    }
2377                                }
2378
2379                                if event_sink_is_cancelled(event_sink) {
2380                                    return Err(workflow_event_sink_cancelled_message().to_string());
2381                                }
2382                            }
2383
2384                            if let Some(usage) = final_stream_usage {
2385                                if let Some(total) = usage_total.as_mut() {
2386                                    total.prompt_tokens += usage.prompt_tokens;
2387                                    total.completion_tokens += usage.completion_tokens;
2388                                    total.total_tokens += usage.total_tokens;
2389                                    if let Some(reasoning_tokens) = usage.reasoning_tokens {
2390                                        total.reasoning_tokens = Some(
2391                                            total.reasoning_tokens.unwrap_or(0) + reasoning_tokens,
2392                                        );
2393                                    }
2394                                } else {
2395                                    usage_total = Some(YamlLlmTokenUsage {
2396                                        prompt_tokens: usage.prompt_tokens,
2397                                        completion_tokens: usage.completion_tokens,
2398                                        total_tokens: usage.total_tokens,
2399                                        reasoning_tokens: usage.reasoning_tokens,
2400                                    });
2401                                }
2402                            }
2403
2404                            let mut ordered_tool_calls =
2405                                tool_calls_by_index.into_iter().collect::<Vec<_>>();
2406                            ordered_tool_calls.sort_by_key(|(index, _)| *index);
2407                            if !ordered_tool_calls.is_empty() {
2408                                streamed_tool_calls = Some(
2409                                    ordered_tool_calls
2410                                        .into_iter()
2411                                        .map(|(_, tool_call)| tool_call)
2412                                        .collect::<Vec<_>>(),
2413                                );
2414                            }
2415                        }
2416                    }
2417
2418                    let has_tool_calls = streamed_tool_calls
2419                        .as_ref()
2420                        .is_some_and(|calls| !calls.is_empty());
2421                    if finish_reason != FinishReason::ToolCalls && !has_tool_calls {
2422                        let payload = if expects_object {
2423                            parse_streamed_structured_payload(
2424                                streamed_content.as_str(),
2425                                request.heal,
2426                            )
2427                            .map_err(|error| {
2428                                format!("failed to parse structured completion JSON: {error}")
2429                            })?
2430                            .payload
2431                        } else {
2432                            Value::String(streamed_content.clone())
2433                        };
2434                        return Ok(YamlLlmExecutionResult {
2435                            payload,
2436                            usage: usage_total,
2437                            ttft_ms: None,
2438                            tool_calls: tool_traces,
2439                        });
2440                    }
2441
2442                    if roundtrip >= request.max_tool_roundtrips {
2443                        return Err(format!(
2444                            "tool call roundtrip limit reached for node '{}' (max={})",
2445                            request.node_id, request.max_tool_roundtrips
2446                        ));
2447                    }
2448
2449                    let tool_calls: Vec<ToolCall> = streamed_tool_calls.ok_or_else(|| {
2450                        "finish_reason=tool_calls but no tool calls found".to_string()
2451                    })?;
2452                    if tool_calls
2453                        .iter()
2454                        .any(|tool_call| tool_call.function.name.trim().is_empty())
2455                    {
2456                        return Err("streamed tool call missing function name".to_string());
2457                    }
2458
2459                    let assistant_tool_message =
2460                        Message::assistant(&streamed_content).with_tool_calls(tool_calls.clone());
2461                    conversation.push(assistant_tool_message);
2462
2463                    for tool_call in tool_calls {
2464                        let tool_call_id = tool_call.id.clone();
2465                        let tool_name = tool_call.function.name.clone();
2466                        let tool_started = Instant::now();
2467                        let arguments: Value = serde_json::from_str(&tool_call.function.arguments)
2468                            .map_err(|error| {
2469                                format!(
2470                                    "tool '{}' arguments must be valid JSON: {}",
2471                                    tool_name, error
2472                                )
2473                            })?;
2474                        let mut tool_span_context: Option<TraceContext> = None;
2475                        let mut tool_span = if request.trace_sampled {
2476                            let (span_context, mut span) = workflow_tracer().start_span(
2477                                "workflow.tool.execute",
2478                                SpanKind::Node,
2479                                request.trace_context.as_ref(),
2480                            );
2481                            tool_span_context = Some(span_context);
2482                            span.set_attribute(
2483                                "trace_id",
2484                                request.trace_id.as_deref().unwrap_or_default(),
2485                            );
2486                            span.set_attribute("node_id", request.node_id.as_str());
2487                            span.set_attribute("node_kind", "llm_call");
2488                            span.set_attribute("tool_name", tool_name.as_str());
2489                            span.set_attribute("tool_call_id", tool_call_id.as_str());
2490                            let args_for_span =
2491                                payload_for_tool_trace(request.tool_trace_mode, &arguments)
2492                                    .to_string();
2493                            span.set_attribute("tool_arguments", args_for_span.as_str());
2494                            Some(span)
2495                        } else {
2496                            None
2497                        };
2498
2499                        if request.tool_trace_mode != YamlToolTraceMode::Off {
2500                            if let Some(sink) = event_sink {
2501                                sink.emit(&YamlWorkflowEvent {
2502                                    event_type: "node_tool_call_requested".to_string(),
2503                                    node_id: Some(request.node_id.clone()),
2504                                    step_id: Some(request.node_id.clone()),
2505                                    node_kind: Some("llm_call".to_string()),
2506                                    streamable: Some(false),
2507                                    message: Some(format!(
2508                                        "tool call requested: {}",
2509                                        tool_name
2510                                    )),
2511                                    delta: None,
2512                                    token_kind: None,
2513                                    is_terminal_node_token: None,
2514                                    elapsed_ms: None,
2515                                    metadata: Some(json!({
2516                                        "tool_call_id": tool_call_id.clone(),
2517                                        "tool_name": tool_name.clone(),
2518                                        "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
2519                                    })),
2520                                });
2521                            }
2522                        }
2523
2524                        let Some(tool_config) = request
2525                            .tools
2526                            .iter()
2527                            .find(|tool| tool.definition.function.name == tool_name)
2528                        else {
2529                            return Err(format!("model requested unknown tool '{}'", tool_name));
2530                        };
2531
2532                        let tool_output_result = if tool_name == "run_workflow_graph" {
2533                            execute_subworkflow_tool_call(
2534                                &arguments,
2535                                &request.execution_context,
2536                                self.client,
2537                                self.custom_worker,
2538                                &self.run_options,
2539                                tool_span_context.as_ref(),
2540                                request.trace_id.as_deref(),
2541                            )
2542                            .await
2543                        } else if let Some(custom_worker) = self.custom_worker {
2544                            custom_worker
2545                                .execute(
2546                                    tool_name.as_str(),
2547                                    &arguments,
2548                                    request.email_text.as_str(),
2549                                    &request.execution_context,
2550                                )
2551                                .await
2552                        } else {
2553                            Err(format!(
2554                                "tool '{}' requested but no custom worker executor is configured",
2555                                tool_name
2556                            ))
2557                        };
2558
2559                        let tool_output = match tool_output_result {
2560                            Ok(output) => output,
2561                            Err(message) => {
2562                                let elapsed_ms = tool_started.elapsed().as_millis();
2563                                if let Some(span) = tool_span.as_mut() {
2564                                    span.add_event("workflow.tool.execute.error");
2565                                    span.set_attribute("tool_status", "error");
2566                                    span.set_attribute("tool_error", message.as_str());
2567                                    span.set_attribute(
2568                                        "elapsed_ms",
2569                                        elapsed_ms.to_string().as_str(),
2570                                    );
2571                                }
2572                                if request.tool_trace_mode != YamlToolTraceMode::Off {
2573                                    if let Some(sink) = event_sink {
2574                                        sink.emit(&YamlWorkflowEvent {
2575                                            event_type: "node_tool_call_failed".to_string(),
2576                                            node_id: Some(request.node_id.clone()),
2577                                            step_id: Some(request.node_id.clone()),
2578                                            node_kind: Some("llm_call".to_string()),
2579                                            streamable: Some(false),
2580                                            message: Some(message.clone()),
2581                                            delta: None,
2582                                            token_kind: None,
2583                                            is_terminal_node_token: None,
2584                                            elapsed_ms: Some(elapsed_ms),
2585                                            metadata: Some(json!({
2586                                                "tool_call_id": tool_call_id.clone(),
2587                                                "tool_name": tool_name.clone(),
2588                                            })),
2589                                        });
2590                                    }
2591                                }
2592                                tool_traces.push(YamlToolCallTrace {
2593                                    id: tool_call_id.clone(),
2594                                    name: tool_name.clone(),
2595                                    arguments,
2596                                    output: None,
2597                                    status: "error".to_string(),
2598                                    elapsed_ms,
2599                                    error: Some(message.clone()),
2600                                });
2601                                if let Some(span) = tool_span.take() {
2602                                    span.end();
2603                                }
2604                                return Err(format!("tool '{}' failed: {}", tool_name, message));
2605                            }
2606                        };
2607
2608                        if let Some(output_schema) = tool_config.output_schema.as_ref() {
2609                            validate_schema_instance(output_schema, &tool_output).map_err(
2610                                |message| {
2611                                    format!(
2612                                        "tool '{}' output failed schema validation: {}",
2613                                        tool_name, message
2614                                    )
2615                                },
2616                            )?;
2617                        }
2618
2619                        let elapsed_ms = tool_started.elapsed().as_millis();
2620                        if let Some(span) = tool_span.as_mut() {
2621                            span.add_event("workflow.tool.execute.completed");
2622                            span.set_attribute("tool_status", "ok");
2623                            span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
2624                            let output_for_span =
2625                                payload_for_tool_trace(request.tool_trace_mode, &tool_output)
2626                                    .to_string();
2627                            span.set_attribute("tool_output", output_for_span.as_str());
2628                        }
2629                        if request.tool_trace_mode != YamlToolTraceMode::Off {
2630                            if let Some(sink) = event_sink {
2631                                sink.emit(&YamlWorkflowEvent {
2632                                    event_type: "node_tool_call_completed".to_string(),
2633                                    node_id: Some(request.node_id.clone()),
2634                                    step_id: Some(request.node_id.clone()),
2635                                    node_kind: Some("llm_call".to_string()),
2636                                    streamable: Some(false),
2637                                    message: Some(format!(
2638                                        "tool call completed: {}",
2639                                        tool_name
2640                                    )),
2641                                    delta: None,
2642                                    token_kind: None,
2643                                    is_terminal_node_token: None,
2644                                    elapsed_ms: Some(elapsed_ms),
2645                                    metadata: Some(json!({
2646                                        "tool_call_id": tool_call_id.clone(),
2647                                        "tool_name": tool_name.clone(),
2648                                        "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
2649                                        "output": payload_for_tool_trace(request.tool_trace_mode, &tool_output),
2650                                    })),
2651                                });
2652                            }
2653                        }
2654
2655                        tool_traces.push(YamlToolCallTrace {
2656                            id: tool_call_id.clone(),
2657                            name: tool_name.clone(),
2658                            arguments: arguments.clone(),
2659                            output: Some(tool_output.clone()),
2660                            status: "ok".to_string(),
2661                            elapsed_ms,
2662                            error: None,
2663                        });
2664
2665                        conversation.push(Message::tool(
2666                            serde_json::to_string(&tool_output).map_err(|error| {
2667                                format!("failed to serialize tool output: {error}")
2668                            })?,
2669                            tool_call_id,
2670                        ));
2671                        if let Some(span) = tool_span.take() {
2672                            span.end();
2673                        }
2674                    }
2675
2676                    if request.tool_trace_mode != YamlToolTraceMode::Off {
2677                        if let Some(sink) = event_sink {
2678                            sink.emit(&YamlWorkflowEvent {
2679                                event_type: "node_tool_roundtrip_completed".to_string(),
2680                                node_id: Some(request.node_id.clone()),
2681                                step_id: Some(request.node_id.clone()),
2682                                node_kind: Some("llm_call".to_string()),
2683                                streamable: Some(false),
2684                                message: Some(format!(
2685                                    "tool roundtrip {} completed",
2686                                    roundtrip + 1
2687                                )),
2688                                delta: None,
2689                                token_kind: None,
2690                                is_terminal_node_token: None,
2691                                elapsed_ms: None,
2692                                metadata: Some(json!({
2693                                    "roundtrip": roundtrip + 1,
2694                                    "max_tool_roundtrips": request.max_tool_roundtrips,
2695                                })),
2696                            });
2697                        }
2698                    }
2699                }
2700
2701                return Err(format!(
2702                    "tool-enabled llm_call '{}' exhausted loop without final payload",
2703                    request.node_id
2704                ));
2705            }
2706
2707            let mut builder = CompletionRequest::builder()
2708                .model(&request.model)
2709                .messages(messages);
2710
2711            if request.heal && !request.stream && expects_object {
2712                builder = builder.json_schema("workflow_step", request.schema.clone());
2713            }
2714
2715            if request.stream {
2716                builder = builder.stream(true);
2717            }
2718
2719            let completion_request = builder
2720                .build()
2721                .map_err(|error| format!("failed to build completion request: {error}"))?;
2722
2723            let completion_options = if request.heal && !request.stream && expects_object {
2724                CompletionOptions {
2725                    mode: CompletionMode::HealedJson,
2726                }
2727            } else {
2728                CompletionOptions::default()
2729            };
2730
2731            let outcome = self
2732                .client
2733                .complete(&completion_request, completion_options)
2734                .await
2735                .map_err(|error| error.to_string())?;
2736
2737            match outcome {
2738                CompletionOutcome::Stream(mut stream) => {
2739                    let mut aggregated = String::new();
2740                    let mut final_stream_usage: Option<simple_agent_type::response::Usage> = None;
2741                    let stream_started = Instant::now();
2742                    let mut ttft_ms: Option<u128> = None;
2743                    let mut delta_filter = StructuredJsonDeltaFilter::default();
2744                    let include_raw_debug = include_raw_stream_debug_events();
2745                    let mut json_text_formatter = if request.stream_json_as_text {
2746                        Some(StreamJsonAsTextFormatter::default())
2747                    } else {
2748                        None
2749                    };
2750                    while let Some(chunk_result) = stream.next().await {
2751                        if event_sink_is_cancelled(event_sink) {
2752                            return Err(workflow_event_sink_cancelled_message().to_string());
2753                        }
2754                        let chunk = chunk_result.map_err(|error| error.to_string())?;
2755                        if let Some(usage) = chunk.usage {
2756                            final_stream_usage = Some(usage);
2757                        }
2758                        if let Some(choice) = chunk.choices.first() {
2759                            if ttft_ms.is_none()
2760                                && (choice
2761                                    .delta
2762                                    .content
2763                                    .as_ref()
2764                                    .is_some_and(|delta| !delta.is_empty())
2765                                    || choice
2766                                        .delta
2767                                        .reasoning_content
2768                                        .as_ref()
2769                                        .is_some_and(|delta| !delta.is_empty()))
2770                            {
2771                                ttft_ms = Some(stream_started.elapsed().as_millis());
2772                            }
2773                            if include_raw_debug {
2774                                if let Some(reasoning_delta) =
2775                                    choice.delta.reasoning_content.as_ref()
2776                                {
2777                                    if let Some(sink) = event_sink {
2778                                        sink.emit(&YamlWorkflowEvent {
2779                                            event_type: "node_stream_thinking_delta".to_string(),
2780                                            node_id: Some(request.node_id.clone()),
2781                                            step_id: Some(request.node_id.clone()),
2782                                            node_kind: Some("llm_call".to_string()),
2783                                            streamable: Some(true),
2784                                            message: None,
2785                                            delta: Some(reasoning_delta.clone()),
2786                                            token_kind: Some(YamlWorkflowTokenKind::Thinking),
2787                                            is_terminal_node_token: Some(request.is_terminal_node),
2788                                            elapsed_ms: None,
2789                                            metadata: None,
2790                                        });
2791                                    }
2792                                }
2793                            }
2794                            if let Some(delta) = choice.delta.content.clone() {
2795                                aggregated.push_str(delta.as_str());
2796                                let (output_delta, thinking_delta) = if expects_object {
2797                                    delta_filter.split(delta.as_str())
2798                                } else {
2799                                    (Some(delta.clone()), None)
2800                                };
2801                                let rendered_output_delta = if let Some(output_chunk) = output_delta
2802                                {
2803                                    if let Some(formatter) = json_text_formatter.as_mut() {
2804                                        formatter.push(output_chunk.as_str());
2805                                        formatter.emit_if_ready(delta_filter.completed())
2806                                    } else {
2807                                        Some(output_chunk)
2808                                    }
2809                                } else {
2810                                    None
2811                                };
2812                                if include_raw_debug {
2813                                    if let Some(sink) = event_sink {
2814                                        if let Some(raw_thinking_delta) = thinking_delta.as_ref() {
2815                                            sink.emit(&YamlWorkflowEvent {
2816                                                event_type: "node_stream_thinking_delta"
2817                                                    .to_string(),
2818                                                node_id: Some(request.node_id.clone()),
2819                                                step_id: Some(request.node_id.clone()),
2820                                                node_kind: Some("llm_call".to_string()),
2821                                                streamable: Some(true),
2822                                                message: None,
2823                                                delta: Some(raw_thinking_delta.clone()),
2824                                                token_kind: Some(YamlWorkflowTokenKind::Thinking),
2825                                                is_terminal_node_token: Some(
2826                                                    request.is_terminal_node,
2827                                                ),
2828                                                elapsed_ms: None,
2829                                                metadata: None,
2830                                            });
2831                                        }
2832                                        if let Some(raw_output_delta) =
2833                                            rendered_output_delta.as_ref()
2834                                        {
2835                                            sink.emit(&YamlWorkflowEvent {
2836                                                event_type: "node_stream_output_delta".to_string(),
2837                                                node_id: Some(request.node_id.clone()),
2838                                                step_id: Some(request.node_id.clone()),
2839                                                node_kind: Some("llm_call".to_string()),
2840                                                streamable: Some(true),
2841                                                message: None,
2842                                                delta: Some(raw_output_delta.clone()),
2843                                                token_kind: Some(YamlWorkflowTokenKind::Output),
2844                                                is_terminal_node_token: Some(
2845                                                    request.is_terminal_node,
2846                                                ),
2847                                                elapsed_ms: None,
2848                                                metadata: None,
2849                                            });
2850                                        }
2851                                    }
2852                                }
2853                                if let Some(filtered_delta) = rendered_output_delta {
2854                                    if let Some(sink) = event_sink {
2855                                        sink.emit(&YamlWorkflowEvent {
2856                                            event_type: "node_stream_delta".to_string(),
2857                                            node_id: Some(request.node_id.clone()),
2858                                            step_id: Some(request.node_id.clone()),
2859                                            node_kind: Some("llm_call".to_string()),
2860                                            streamable: Some(true),
2861                                            message: None,
2862                                            delta: Some(filtered_delta),
2863                                            token_kind: Some(YamlWorkflowTokenKind::Output),
2864                                            is_terminal_node_token: Some(request.is_terminal_node),
2865                                            elapsed_ms: None,
2866                                            metadata: None,
2867                                        });
2868                                    }
2869                                }
2870                            }
2871                        }
2872
2873                        if event_sink_is_cancelled(event_sink) {
2874                            return Err(workflow_event_sink_cancelled_message().to_string());
2875                        }
2876                    }
2877
2878                    let payload = if expects_object {
2879                        let resolved =
2880                            parse_streamed_structured_payload(aggregated.as_str(), request.heal)?;
2881                        if let Some(confidence) = resolved.heal_confidence {
2882                            if let Some(sink) = event_sink {
2883                                sink.emit(&YamlWorkflowEvent {
2884                                    event_type: "node_healed".to_string(),
2885                                    node_id: Some(request.node_id.clone()),
2886                                    step_id: Some(request.node_id.clone()),
2887                                    node_kind: Some("llm_call".to_string()),
2888                                    streamable: Some(true),
2889                                    message: Some(format!(
2890                                        "healed streamed structured response confidence={confidence}"
2891                                    )),
2892                                    delta: None,
2893                                    token_kind: None,
2894                                    is_terminal_node_token: None,
2895                                    elapsed_ms: None,
2896                                    metadata: None,
2897                                });
2898                            }
2899                        }
2900                        resolved.payload
2901                    } else {
2902                        Value::String(aggregated)
2903                    };
2904
2905                    Ok(YamlLlmExecutionResult {
2906                        payload,
2907                        usage: final_stream_usage.map(|usage| YamlLlmTokenUsage {
2908                            prompt_tokens: usage.prompt_tokens,
2909                            completion_tokens: usage.completion_tokens,
2910                            total_tokens: usage.total_tokens,
2911                            reasoning_tokens: usage.reasoning_tokens,
2912                        }),
2913                        ttft_ms,
2914                        tool_calls: Vec::new(),
2915                    })
2916                }
2917                CompletionOutcome::Response(response) => {
2918                    let payload = if expects_object {
2919                        let content = response
2920                            .content()
2921                            .ok_or_else(|| "completion returned empty content".to_string())?;
2922                        serde_json::from_str(content).map_err(|error| {
2923                            format!("failed to parse structured completion JSON: {error}")
2924                        })?
2925                    } else {
2926                        Value::String(response.content().unwrap_or_default().to_string())
2927                    };
2928
2929                    Ok(YamlLlmExecutionResult {
2930                        payload,
2931                        usage: Some(YamlLlmTokenUsage {
2932                            prompt_tokens: response.usage.prompt_tokens,
2933                            completion_tokens: response.usage.completion_tokens,
2934                            total_tokens: response.usage.total_tokens,
2935                            reasoning_tokens: response.usage.reasoning_tokens,
2936                        }),
2937                        ttft_ms: None,
2938                        tool_calls: Vec::new(),
2939                    })
2940                }
2941                CompletionOutcome::HealedJson(healed) => {
2942                    if !expects_object {
2943                        return Err(
2944                            "healed json outcome is unsupported for non-object schema".to_string()
2945                        );
2946                    }
2947                    if let Some(sink) = event_sink {
2948                        sink.emit(&YamlWorkflowEvent {
2949                            event_type: "node_healed".to_string(),
2950                            node_id: Some(request.node_id.clone()),
2951                            step_id: Some(request.node_id.clone()),
2952                            node_kind: Some("llm_call".to_string()),
2953                            streamable: Some(request.stream),
2954                            message: Some(format!(
2955                                "healed structured response confidence={}",
2956                                healed.parsed.confidence
2957                            )),
2958                            delta: None,
2959                            token_kind: None,
2960                            is_terminal_node_token: None,
2961                            elapsed_ms: None,
2962                            metadata: None,
2963                        });
2964                    }
2965                    Ok(YamlLlmExecutionResult {
2966                        payload: healed.parsed.value,
2967                        usage: Some(YamlLlmTokenUsage {
2968                            prompt_tokens: healed.response.usage.prompt_tokens,
2969                            completion_tokens: healed.response.usage.completion_tokens,
2970                            total_tokens: healed.response.usage.total_tokens,
2971                            reasoning_tokens: healed.response.usage.reasoning_tokens,
2972                        }),
2973                        ttft_ms: None,
2974                        tool_calls: Vec::new(),
2975                    })
2976                }
2977                CompletionOutcome::CoercedSchema(coerced) => {
2978                    if !expects_object {
2979                        return Err(
2980                            "coerced schema outcome is unsupported for non-object schema"
2981                                .to_string(),
2982                        );
2983                    }
2984                    Ok(YamlLlmExecutionResult {
2985                        payload: coerced.coerced.value,
2986                        usage: Some(YamlLlmTokenUsage {
2987                            prompt_tokens: coerced.response.usage.prompt_tokens,
2988                            completion_tokens: coerced.response.usage.completion_tokens,
2989                            total_tokens: coerced.response.usage.total_tokens,
2990                            reasoning_tokens: coerced.response.usage.reasoning_tokens,
2991                        }),
2992                        ttft_ms: None,
2993                        tool_calls: Vec::new(),
2994                    })
2995                }
2996            }
2997        }
2998    }
2999
3000    let executor = BorrowedClientExecutor {
3001        client,
3002        custom_worker,
3003        run_options: options.clone(),
3004    };
3005    run_workflow_yaml_with_custom_worker_and_events_and_options(
3006        workflow,
3007        workflow_input,
3008        &executor,
3009        custom_worker,
3010        event_sink,
3011        options,
3012    )
3013    .await
3014}
3015
3016pub async fn run_email_workflow_yaml_with_client_and_custom_worker_and_events(
3017    workflow: &YamlWorkflow,
3018    email_text: &str,
3019    client: &SimpleAgentsClient,
3020    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3021    event_sink: Option<&dyn YamlWorkflowEventSink>,
3022) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3023    let workflow_input = json!({ "email_text": email_text });
3024    run_workflow_yaml_with_client_and_custom_worker_and_events(
3025        workflow,
3026        &workflow_input,
3027        client,
3028        custom_worker,
3029        event_sink,
3030    )
3031    .await
3032}
3033
3034pub async fn run_workflow_yaml(
3035    workflow: &YamlWorkflow,
3036    workflow_input: &Value,
3037    executor: &dyn YamlWorkflowLlmExecutor,
3038) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3039    run_workflow_yaml_with_custom_worker_and_events(workflow, workflow_input, executor, None, None)
3040        .await
3041}
3042
3043pub async fn run_email_workflow_yaml(
3044    workflow: &YamlWorkflow,
3045    email_text: &str,
3046    executor: &dyn YamlWorkflowLlmExecutor,
3047) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3048    let workflow_input = json!({ "email_text": email_text });
3049    run_workflow_yaml(workflow, &workflow_input, executor).await
3050}
3051
3052pub async fn run_workflow_yaml_with_custom_worker(
3053    workflow: &YamlWorkflow,
3054    workflow_input: &Value,
3055    executor: &dyn YamlWorkflowLlmExecutor,
3056    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3057) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3058    run_workflow_yaml_with_custom_worker_and_events(
3059        workflow,
3060        workflow_input,
3061        executor,
3062        custom_worker,
3063        None,
3064    )
3065    .await
3066}
3067
3068pub async fn run_email_workflow_yaml_with_custom_worker(
3069    workflow: &YamlWorkflow,
3070    email_text: &str,
3071    executor: &dyn YamlWorkflowLlmExecutor,
3072    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3073) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3074    let workflow_input = json!({ "email_text": email_text });
3075    run_workflow_yaml_with_custom_worker(workflow, &workflow_input, executor, custom_worker).await
3076}
3077
3078pub async fn run_workflow_yaml_with_custom_worker_and_events(
3079    workflow: &YamlWorkflow,
3080    workflow_input: &Value,
3081    executor: &dyn YamlWorkflowLlmExecutor,
3082    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3083    event_sink: Option<&dyn YamlWorkflowEventSink>,
3084) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3085    run_workflow_yaml_with_custom_worker_and_events_and_options(
3086        workflow,
3087        workflow_input,
3088        executor,
3089        custom_worker,
3090        event_sink,
3091        &YamlWorkflowRunOptions::default(),
3092    )
3093    .await
3094}
3095
3096pub async fn run_workflow_yaml_with_custom_worker_and_events_and_options(
3097    workflow: &YamlWorkflow,
3098    workflow_input: &Value,
3099    executor: &dyn YamlWorkflowLlmExecutor,
3100    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3101    event_sink: Option<&dyn YamlWorkflowEventSink>,
3102    options: &YamlWorkflowRunOptions,
3103) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3104    if !workflow_input.is_object() {
3105        return Err(YamlWorkflowRunError::InvalidInput {
3106            message: "workflow input must be a JSON object".to_string(),
3107        });
3108    }
3109
3110    validate_sample_rate(options.telemetry.sample_rate)?;
3111
3112    let email_text = workflow_input
3113        .get("email_text")
3114        .and_then(Value::as_str)
3115        .unwrap_or_default();
3116
3117    let diagnostics = verify_yaml_workflow(workflow);
3118    let errors: Vec<YamlWorkflowDiagnostic> = diagnostics
3119        .iter()
3120        .filter(|d| d.severity == YamlWorkflowDiagnosticSeverity::Error)
3121        .cloned()
3122        .collect();
3123    if !errors.is_empty() {
3124        return Err(YamlWorkflowRunError::Validation {
3125            diagnostics_count: errors.len(),
3126            diagnostics: errors,
3127        });
3128    }
3129
3130    if let Some(output) =
3131        try_run_yaml_via_ir_runtime(workflow, workflow_input, executor, custom_worker, options)
3132            .await?
3133    {
3134        return Ok(output);
3135    }
3136
3137    let parent_trace_context = trace_context_from_options(options);
3138    let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
3139
3140    let tracer = workflow_tracer();
3141    let mut workflow_span_context: Option<TraceContext> = None;
3142    let mut workflow_span = if telemetry_context.sampled {
3143        let (span_context, mut span) = tracer.start_span(
3144            "workflow.run",
3145            SpanKind::Workflow,
3146            parent_trace_context.as_ref(),
3147        );
3148        if let Some(trace_id_value) = telemetry_context.trace_id.as_deref() {
3149            span.set_attribute("trace_id", trace_id_value);
3150        }
3151        apply_trace_tenant_attributes(span.as_mut(), options);
3152        workflow_span_context = Some(span_context);
3153        Some(span)
3154    } else {
3155        None
3156    };
3157
3158    if workflow.nodes.is_empty() {
3159        return Err(YamlWorkflowRunError::EmptyNodes {
3160            workflow_id: workflow.id.clone(),
3161        });
3162    }
3163
3164    let node_map: HashMap<&str, &YamlNode> = workflow
3165        .nodes
3166        .iter()
3167        .map(|node| (node.id.as_str(), node))
3168        .collect();
3169    if !node_map.contains_key(workflow.entry_node.as_str()) {
3170        return Err(YamlWorkflowRunError::MissingEntry {
3171            entry_node: workflow.entry_node.clone(),
3172        });
3173    }
3174
3175    let edge_map: HashMap<&str, &str> = workflow
3176        .edges
3177        .iter()
3178        .map(|edge| (edge.from.as_str(), edge.to.as_str()))
3179        .collect();
3180
3181    let mut current = workflow.entry_node.clone();
3182    let mut trace = Vec::new();
3183    let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
3184    let mut globals = serde_json::Map::new();
3185    let mut step_timings = Vec::new();
3186    let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
3187    let mut llm_node_models: BTreeMap<String, String> = BTreeMap::new();
3188    let mut token_totals = YamlTokenTotals::default();
3189    let mut workflow_ttft_ms: Option<u128> = None;
3190    let started = Instant::now();
3191
3192    if let Some(sink) = event_sink {
3193        sink.emit(&YamlWorkflowEvent {
3194            event_type: "workflow_started".to_string(),
3195            node_id: None,
3196            step_id: None,
3197            node_kind: None,
3198            streamable: None,
3199            message: Some(format!("workflow_id={}", workflow.id)),
3200            delta: None,
3201            token_kind: None,
3202            is_terminal_node_token: None,
3203            elapsed_ms: Some(0),
3204            metadata: None,
3205        });
3206    }
3207
3208    if event_sink_is_cancelled(event_sink) {
3209        return Err(YamlWorkflowRunError::EventSinkCancelled {
3210            message: workflow_event_sink_cancelled_message().to_string(),
3211        });
3212    }
3213
3214    loop {
3215        if event_sink_is_cancelled(event_sink) {
3216            return Err(YamlWorkflowRunError::EventSinkCancelled {
3217                message: workflow_event_sink_cancelled_message().to_string(),
3218            });
3219        }
3220
3221        let node =
3222            *node_map
3223                .get(current.as_str())
3224                .ok_or_else(|| YamlWorkflowRunError::MissingNode {
3225                    node_id: current.clone(),
3226                })?;
3227
3228        trace.push(node.id.clone());
3229        let step_started = Instant::now();
3230
3231        let mut node_span_context: Option<TraceContext> = None;
3232        let mut node_span = if telemetry_context.sampled {
3233            let (span_context, mut span) = tracer.start_span(
3234                "workflow.node.execute",
3235                SpanKind::Node,
3236                workflow_span_context.as_ref(),
3237            );
3238            node_span_context = Some(span_context);
3239            span.set_attribute(
3240                "trace_id",
3241                telemetry_context.trace_id.as_deref().unwrap_or_default(),
3242            );
3243            span.set_attribute("node_id", node.id.as_str());
3244            span.set_attribute("node_kind", node.kind_name());
3245            Some(span)
3246        } else {
3247            None
3248        };
3249
3250        let node_streamable = node
3251            .node_type
3252            .llm_call
3253            .as_ref()
3254            .map(|llm| llm.stream.unwrap_or(false) && !llm.heal.unwrap_or(false));
3255        let workflow_elapsed_before_node_ms = started.elapsed().as_millis();
3256
3257        if let Some(sink) = event_sink {
3258            sink.emit(&YamlWorkflowEvent {
3259                event_type: "node_started".to_string(),
3260                node_id: Some(node.id.clone()),
3261                step_id: Some(node.id.clone()),
3262                node_kind: Some(node.kind_name().to_string()),
3263                streamable: node_streamable,
3264                message: if node_streamable == Some(false) {
3265                    Some("Node is not streamable; status events only".to_string())
3266                } else {
3267                    None
3268                },
3269                delta: None,
3270                token_kind: None,
3271                is_terminal_node_token: None,
3272                elapsed_ms: Some(workflow_elapsed_before_node_ms),
3273                metadata: None,
3274            });
3275        }
3276
3277        if event_sink_is_cancelled(event_sink) {
3278            return Err(YamlWorkflowRunError::EventSinkCancelled {
3279                message: workflow_event_sink_cancelled_message().to_string(),
3280            });
3281        }
3282
3283        let mut node_usage: Option<YamlLlmTokenUsage> = None;
3284        let mut node_model_name: Option<String> = None;
3285        let is_terminal_node = !edge_map.contains_key(node.id.as_str());
3286        let next = if let Some(llm) = &node.node_type.llm_call {
3287            let prompt_template = node
3288                .config
3289                .as_ref()
3290                .and_then(|cfg| cfg.prompt.as_deref())
3291                .unwrap_or_default();
3292            let context = json!({
3293                "input": workflow_input,
3294                "nodes": outputs,
3295                "globals": Value::Object(globals.clone())
3296            });
3297            let messages = if let Some(path) = llm.messages_path.as_deref() {
3298                Some(
3299                    parse_messages_from_context(path, &context).map_err(|message| {
3300                        YamlWorkflowRunError::Llm {
3301                            node_id: node.id.clone(),
3302                            message,
3303                        }
3304                    })?,
3305                )
3306            } else {
3307                None
3308            };
3309            let prompt_bindings = collect_template_bindings(prompt_template, &context);
3310            let prompt = interpolate_template(prompt_template, &context);
3311            let schema = llm_output_schema_for_node(node);
3312
3313            let request = YamlLlmExecutionRequest {
3314                node_id: node.id.clone(),
3315                is_terminal_node,
3316                stream_json_as_text: llm.stream_json_as_text.unwrap_or(false),
3317                model: resolve_requested_model(options.model.as_deref(), &llm.model),
3318                messages,
3319                append_prompt_as_user: llm.append_prompt_as_user.unwrap_or(true),
3320                prompt,
3321                prompt_template: prompt_template.to_string(),
3322                prompt_bindings,
3323                schema,
3324                stream: llm.stream.unwrap_or(false),
3325                heal: llm.heal.unwrap_or(false),
3326                tools: normalize_llm_tools(llm).map_err(|message| YamlWorkflowRunError::Llm {
3327                    node_id: node.id.clone(),
3328                    message,
3329                })?,
3330                tool_choice: normalize_tool_choice(llm.tool_choice.clone()).map_err(|message| {
3331                    YamlWorkflowRunError::Llm {
3332                        node_id: node.id.clone(),
3333                        message,
3334                    }
3335                })?,
3336                max_tool_roundtrips: llm.max_tool_roundtrips.unwrap_or(1),
3337                tool_calls_global_key: llm.tool_calls_global_key.clone(),
3338                tool_trace_mode: options.telemetry.tool_trace_mode,
3339                execution_context: context.clone(),
3340                email_text: email_text.to_string(),
3341                trace_id: telemetry_context.trace_id.clone(),
3342                trace_context: node_span_context.clone(),
3343                trace_sampled: telemetry_context.sampled,
3344            };
3345
3346            if let Some(span) = node_span.as_mut() {
3347                span.set_attribute(
3348                    "node_input",
3349                    payload_for_span(options.telemetry.payload_mode, &context).as_str(),
3350                );
3351            }
3352
3353            if let Some(sink) = event_sink {
3354                sink.emit(&YamlWorkflowEvent {
3355                    event_type: "node_llm_input_resolved".to_string(),
3356                    node_id: Some(node.id.clone()),
3357                    step_id: Some(node.id.clone()),
3358                    node_kind: Some("llm_call".to_string()),
3359                    streamable: Some(request.stream),
3360                    message: Some("resolved llm input for telemetry".to_string()),
3361                    delta: None,
3362                    token_kind: None,
3363                    is_terminal_node_token: None,
3364                    elapsed_ms: Some(started.elapsed().as_millis()),
3365                    metadata: Some(json!({
3366                        "model": request.model.clone(),
3367                        "stream_requested": request.stream,
3368                        "stream_json_as_text": request.stream_json_as_text,
3369                        "heal_requested": request.heal,
3370                        "effective_stream": request.stream,
3371                        "prompt_template": request.prompt_template.clone(),
3372                        "prompt": request.prompt.clone(),
3373                        "schema": request.schema.clone(),
3374                        "bindings": request.prompt_bindings.clone(),
3375                        "tools_count": request.tools.len(),
3376                        "max_tool_roundtrips": request.max_tool_roundtrips,
3377                    })),
3378                });
3379            }
3380
3381            node_model_name = Some(request.model.clone());
3382            llm_node_models.insert(node.id.clone(), request.model.clone());
3383
3384            if event_sink_is_cancelled(event_sink) {
3385                return Err(YamlWorkflowRunError::EventSinkCancelled {
3386                    message: workflow_event_sink_cancelled_message().to_string(),
3387                });
3388            }
3389
3390            let llm_result = executor
3391                .complete_structured(request, event_sink)
3392                .await
3393                .map_err(|message| YamlWorkflowRunError::Llm {
3394                    node_id: node.id.clone(),
3395                    message,
3396                })?;
3397
3398            if let Some(usage) = llm_result.usage.as_ref() {
3399                token_totals.add_usage(usage);
3400            }
3401            if workflow_ttft_ms.is_none() {
3402                workflow_ttft_ms = llm_result
3403                    .ttft_ms
3404                    .map(|node_ttft_ms| workflow_elapsed_before_node_ms + node_ttft_ms);
3405            }
3406            node_usage = llm_result.usage;
3407
3408            let payload = llm_result.payload;
3409            let tool_calls = llm_result.tool_calls;
3410
3411            let mut node_output = json!({ "output": payload });
3412            if !tool_calls.is_empty() {
3413                if let Some(output_obj) = node_output.as_object_mut() {
3414                    output_obj.insert("tool_calls".to_string(), json!(tool_calls));
3415                }
3416            }
3417            outputs.insert(node.id.clone(), node_output);
3418            if let Some(span) = node_span.as_mut() {
3419                if let Some(output_payload) = outputs.get(node.id.as_str()) {
3420                    span.set_attribute(
3421                        "node_output",
3422                        payload_for_span(options.telemetry.payload_mode, output_payload).as_str(),
3423                    );
3424                }
3425            }
3426            apply_set_globals(node, &outputs, workflow_input, &mut globals);
3427            apply_update_globals(node, &outputs, workflow_input, &mut globals);
3428            if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
3429                if let Some(node_tool_calls) = outputs
3430                    .get(node.id.as_str())
3431                    .and_then(|value| value.get("tool_calls"))
3432                    .cloned()
3433                {
3434                    globals.insert(global_key.clone(), node_tool_calls);
3435                }
3436            }
3437            edge_map
3438                .get(node.id.as_str())
3439                .map(|value| value.to_string())
3440        } else if let Some(switch) = &node.node_type.switch {
3441            let context = json!({
3442                "input": workflow_input,
3443                "nodes": outputs,
3444                "globals": Value::Object(globals.clone())
3445            });
3446            let mut chosen = Some(switch.default.clone());
3447            for branch in &switch.branches {
3448                if evaluate_switch_condition(branch.condition.as_str(), &context)? {
3449                    chosen = Some(branch.target.clone());
3450                    break;
3451                }
3452            }
3453            let chosen = chosen.ok_or_else(|| YamlWorkflowRunError::InvalidSwitchTarget {
3454                node_id: node.id.clone(),
3455            })?;
3456            Some(chosen)
3457        } else if let Some(custom) = &node.node_type.custom_worker {
3458            let payload = node
3459                .config
3460                .as_ref()
3461                .and_then(|cfg| cfg.payload.as_ref())
3462                .cloned()
3463                .unwrap_or_else(|| json!({}));
3464            let context = json!({
3465                "input": workflow_input,
3466                "nodes": outputs,
3467                "globals": Value::Object(globals.clone())
3468            });
3469
3470            if let Some(span) = node_span.as_mut() {
3471                span.set_attribute("handler_name", custom.handler.as_str());
3472                span.set_attribute(
3473                    "node_input",
3474                    payload_for_span(options.telemetry.payload_mode, &payload).as_str(),
3475                );
3476            }
3477
3478            let mut handler_span_context: Option<TraceContext> = None;
3479            let mut handler_span = if telemetry_context.sampled {
3480                let (span_context, mut span) = tracer.start_span(
3481                    "handler.invoke",
3482                    SpanKind::Node,
3483                    workflow_span_context.as_ref(),
3484                );
3485                handler_span_context = Some(span_context);
3486                span.set_attribute(
3487                    "trace_id",
3488                    telemetry_context.trace_id.as_deref().unwrap_or_default(),
3489                );
3490                span.set_attribute("handler_name", custom.handler.as_str());
3491                apply_trace_tenant_attributes(span.as_mut(), options);
3492                Some(span)
3493            } else {
3494                None
3495            };
3496
3497            let worker_trace_context = merged_trace_context_for_worker(
3498                handler_span_context.as_ref(),
3499                telemetry_context.trace_id.as_deref(),
3500                options,
3501            );
3502            let worker_context = custom_worker_context_with_trace(
3503                &context,
3504                &worker_trace_context,
3505                &options.trace.tenant,
3506            );
3507
3508            let worker_output_result = if let Some(custom_worker_executor) = custom_worker {
3509                custom_worker_executor
3510                    .execute(
3511                        custom.handler.as_str(),
3512                        &payload,
3513                        email_text,
3514                        &worker_context,
3515                    )
3516                    .await
3517                    .map_err(|message| YamlWorkflowRunError::CustomWorker {
3518                        node_id: node.id.clone(),
3519                        message,
3520                    })
3521            } else {
3522                mock_custom_worker_output(custom.handler.as_str(), &payload)
3523            };
3524
3525            if let Some(span) = handler_span.take() {
3526                span.end();
3527            }
3528
3529            let worker_output = worker_output_result?;
3530
3531            outputs.insert(node.id.clone(), json!({ "output": worker_output }));
3532            if let Some(span) = node_span.as_mut() {
3533                if let Some(output_payload) = outputs.get(node.id.as_str()) {
3534                    span.set_attribute(
3535                        "node_output",
3536                        payload_for_span(options.telemetry.payload_mode, output_payload).as_str(),
3537                    );
3538                }
3539            }
3540            apply_set_globals(node, &outputs, workflow_input, &mut globals);
3541            apply_update_globals(node, &outputs, workflow_input, &mut globals);
3542            edge_map
3543                .get(node.id.as_str())
3544                .map(|value| value.to_string())
3545        } else {
3546            return Err(YamlWorkflowRunError::UnsupportedNodeType {
3547                node_id: node.id.clone(),
3548            });
3549        };
3550
3551        let node_kind = node.kind_name().to_string();
3552        let elapsed_ms = step_started.elapsed().as_millis();
3553        step_timings.push(YamlStepTiming {
3554            node_id: node.id.clone(),
3555            node_kind,
3556            model_name: node_model_name,
3557            elapsed_ms,
3558            prompt_tokens: node_usage.as_ref().map(|usage| usage.prompt_tokens),
3559            completion_tokens: node_usage.as_ref().map(|usage| usage.completion_tokens),
3560            total_tokens: node_usage.as_ref().map(|usage| usage.total_tokens),
3561            reasoning_tokens: node_usage.as_ref().and_then(|usage| usage.reasoning_tokens),
3562            tokens_per_second: node_usage
3563                .as_ref()
3564                .map(|usage| completion_tokens_per_second(usage.completion_tokens, elapsed_ms)),
3565        });
3566
3567        if let Some(usage) = node_usage.as_ref() {
3568            llm_node_metrics.insert(
3569                node.id.clone(),
3570                YamlLlmNodeMetrics {
3571                    elapsed_ms,
3572                    prompt_tokens: usage.prompt_tokens,
3573                    completion_tokens: usage.completion_tokens,
3574                    total_tokens: usage.total_tokens,
3575                    reasoning_tokens: usage.reasoning_tokens,
3576                    tokens_per_second: completion_tokens_per_second(
3577                        usage.completion_tokens,
3578                        elapsed_ms,
3579                    ),
3580                },
3581            );
3582        }
3583
3584        if let Some(mut span) = node_span.take() {
3585            span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
3586            span.add_event("node_completed");
3587            span.end();
3588        }
3589
3590        if let Some(sink) = event_sink {
3591            sink.emit(&YamlWorkflowEvent {
3592                event_type: "node_completed".to_string(),
3593                node_id: Some(node.id.clone()),
3594                step_id: Some(node.id.clone()),
3595                node_kind: Some(node.kind_name().to_string()),
3596                streamable: node_streamable,
3597                message: None,
3598                delta: None,
3599                token_kind: None,
3600                is_terminal_node_token: None,
3601                elapsed_ms: Some(elapsed_ms),
3602                metadata: None,
3603            });
3604        }
3605
3606        if event_sink_is_cancelled(event_sink) {
3607            return Err(YamlWorkflowRunError::EventSinkCancelled {
3608                message: workflow_event_sink_cancelled_message().to_string(),
3609            });
3610        }
3611
3612        if let Some(next) = next {
3613            current = next;
3614            continue;
3615        }
3616        break;
3617    }
3618
3619    let terminal_node = trace
3620        .last()
3621        .cloned()
3622        .ok_or_else(|| YamlWorkflowRunError::EmptyNodes {
3623            workflow_id: workflow.id.clone(),
3624        })?;
3625
3626    let terminal_output = outputs
3627        .get(terminal_node.as_str())
3628        .and_then(|value| value.get("output"))
3629        .cloned();
3630
3631    let total_elapsed_ms = started.elapsed().as_millis();
3632    let output = YamlWorkflowRunOutput {
3633        workflow_id: workflow.id.clone(),
3634        entry_node: workflow.entry_node.clone(),
3635        email_text: email_text.to_string(),
3636        trace,
3637        outputs,
3638        terminal_node,
3639        terminal_output,
3640        step_timings,
3641        llm_node_metrics,
3642        llm_node_models,
3643        total_elapsed_ms,
3644        ttft_ms: workflow_ttft_ms,
3645        total_input_tokens: token_totals.input_tokens,
3646        total_output_tokens: token_totals.output_tokens,
3647        total_tokens: token_totals.total_tokens,
3648        total_reasoning_tokens: token_totals.reasoning_tokens,
3649        tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
3650        trace_id: telemetry_context.trace_id.clone(),
3651        metadata: telemetry_context.trace_id.as_ref().map(|value| {
3652            workflow_metadata_with_trace(
3653                options,
3654                value,
3655                telemetry_context.sampled,
3656                telemetry_context.trace_id_source,
3657            )
3658        }),
3659    };
3660
3661    if let Some(sink) = event_sink {
3662        let event_metadata = if options.telemetry.nerdstats {
3663            Some(json!({
3664                "nerdstats": workflow_nerdstats(&output),
3665            }))
3666        } else {
3667            None
3668        };
3669        sink.emit(&YamlWorkflowEvent {
3670            event_type: "workflow_completed".to_string(),
3671            node_id: None,
3672            step_id: None,
3673            node_kind: None,
3674            streamable: None,
3675            message: Some(format!("terminal_node={}", output.terminal_node)),
3676            delta: None,
3677            token_kind: None,
3678            is_terminal_node_token: None,
3679            elapsed_ms: Some(output.total_elapsed_ms),
3680            metadata: event_metadata,
3681        });
3682    }
3683
3684    if event_sink_is_cancelled(event_sink) {
3685        return Err(YamlWorkflowRunError::EventSinkCancelled {
3686            message: workflow_event_sink_cancelled_message().to_string(),
3687        });
3688    }
3689
3690    if let Some(mut span) = workflow_span.take() {
3691        span.set_attribute("workflow_id", workflow.id.as_str());
3692        if let Some(trace_id_value) = telemetry_context.trace_id.as_ref() {
3693            span.set_attribute("trace_id", trace_id_value.as_str());
3694        }
3695        span.end();
3696        flush_workflow_tracer();
3697    }
3698
3699    Ok(output)
3700}
3701
3702async fn try_run_yaml_via_ir_runtime(
3703    workflow: &YamlWorkflow,
3704    workflow_input: &Value,
3705    executor: &dyn YamlWorkflowLlmExecutor,
3706    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3707    options: &YamlWorkflowRunOptions,
3708) -> Result<Option<YamlWorkflowRunOutput>, YamlWorkflowRunError> {
3709    let ir = match yaml_workflow_to_ir(workflow) {
3710        Ok(def) => def,
3711        Err(YamlToIrError::UnsupportedNode { .. })
3712        | Err(YamlToIrError::MultipleOutgoingEdge { .. }) => return Ok(None),
3713        Err(err) => {
3714            return Err(YamlWorkflowRunError::InvalidInput {
3715                message: err.to_string(),
3716            });
3717        }
3718    };
3719
3720    let parent_trace_context = trace_context_from_options(options);
3721    let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
3722
3723    let tracer = workflow_tracer();
3724    let mut workflow_span_context: Option<TraceContext> = None;
3725    let mut workflow_span = if telemetry_context.sampled {
3726        let (span_context, mut span) = tracer.start_span(
3727            "workflow.run",
3728            SpanKind::Workflow,
3729            parent_trace_context.as_ref(),
3730        );
3731        if let Some(trace_id_value) = telemetry_context.trace_id.as_deref() {
3732            span.set_attribute("trace_id", trace_id_value);
3733        }
3734        apply_trace_tenant_attributes(span.as_mut(), options);
3735        workflow_span_context = Some(span_context);
3736        Some(span)
3737    } else {
3738        None
3739    };
3740
3741    struct NoopLlm;
3742    #[async_trait]
3743    impl LlmExecutor for NoopLlm {
3744        async fn execute(
3745            &self,
3746            _input: LlmExecutionInput,
3747        ) -> Result<LlmExecutionOutput, LlmExecutionError> {
3748            Err(LlmExecutionError::UnexpectedOutcome(
3749                "yaml_ir_uses_tool_path",
3750            ))
3751        }
3752    }
3753
3754    struct YamlIrToolExecutor<'a> {
3755        llm_executor: &'a dyn YamlWorkflowLlmExecutor,
3756        custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
3757        token_totals: std::sync::Mutex<YamlTokenTotals>,
3758        node_usage: std::sync::Mutex<BTreeMap<String, YamlLlmTokenUsage>>,
3759        node_models: std::sync::Mutex<BTreeMap<String, String>>,
3760        model_override: Option<String>,
3761        trace_id: Option<String>,
3762        trace_context: Option<TraceContext>,
3763        trace_input_context: Option<YamlWorkflowTraceContextInput>,
3764        tenant_context: YamlWorkflowTraceTenantContext,
3765        payload_mode: YamlWorkflowPayloadMode,
3766        trace_sampled: bool,
3767    }
3768
3769    #[async_trait]
3770    impl ToolExecutor for YamlIrToolExecutor<'_> {
3771        async fn execute_tool(
3772            &self,
3773            input: ToolExecutionInput,
3774        ) -> Result<Value, ToolExecutionError> {
3775            let context = build_yaml_context_from_ir_scope(&input.scoped_input);
3776
3777            if input.tool == YAML_LLM_TOOL_ID {
3778                let node_id = input
3779                    .input
3780                    .get("node_id")
3781                    .and_then(Value::as_str)
3782                    .ok_or_else(|| {
3783                        ToolExecutionError::Failed("yaml llm call missing node_id".to_string())
3784                    })?
3785                    .to_string();
3786                let node_id_for_metrics = node_id.clone();
3787                let model = input
3788                    .input
3789                    .get("model")
3790                    .and_then(Value::as_str)
3791                    .ok_or_else(|| {
3792                        ToolExecutionError::Failed("yaml llm call missing model".to_string())
3793                    })?
3794                    .to_string();
3795                let resolved_model =
3796                    resolve_requested_model(self.model_override.as_deref(), &model);
3797                let prompt_template = input
3798                    .input
3799                    .get("prompt_template")
3800                    .and_then(Value::as_str)
3801                    .unwrap_or_default()
3802                    .to_string();
3803                let stream = input
3804                    .input
3805                    .get("stream")
3806                    .and_then(Value::as_bool)
3807                    .unwrap_or(false);
3808                let heal = input
3809                    .input
3810                    .get("heal")
3811                    .and_then(Value::as_bool)
3812                    .unwrap_or(false);
3813                let append_prompt_as_user = input
3814                    .input
3815                    .get("append_prompt_as_user")
3816                    .and_then(Value::as_bool)
3817                    .unwrap_or(true);
3818                let messages_path = input
3819                    .input
3820                    .get("messages_path")
3821                    .and_then(Value::as_str)
3822                    .map(str::to_string);
3823
3824                let messages = if let Some(path) = messages_path.as_deref() {
3825                    Some(
3826                        parse_messages_from_context(path, &context)
3827                            .map_err(ToolExecutionError::Failed)?,
3828                    )
3829                } else {
3830                    None
3831                };
3832
3833                let prompt_bindings = collect_template_bindings(&prompt_template, &context);
3834                let prompt = interpolate_template(&prompt_template, &context);
3835                let email_text = context
3836                    .get("input")
3837                    .and_then(|v| v.get("email_text"))
3838                    .and_then(Value::as_str)
3839                    .unwrap_or_default();
3840                let schema = input
3841                    .input
3842                    .get("output_schema")
3843                    .cloned()
3844                    .unwrap_or_else(default_llm_output_schema);
3845
3846                let request = YamlLlmExecutionRequest {
3847                    node_id,
3848                    is_terminal_node: false,
3849                    stream_json_as_text: input
3850                        .input
3851                        .get("stream_json_as_text")
3852                        .and_then(Value::as_bool)
3853                        .unwrap_or(false),
3854                    model: resolved_model.clone(),
3855                    messages,
3856                    append_prompt_as_user,
3857                    prompt,
3858                    prompt_template,
3859                    prompt_bindings,
3860                    schema,
3861                    stream,
3862                    heal,
3863                    tools: Vec::new(),
3864                    tool_choice: None,
3865                    max_tool_roundtrips: 1,
3866                    tool_calls_global_key: None,
3867                    tool_trace_mode: YamlToolTraceMode::Off,
3868                    execution_context: context.clone(),
3869                    email_text: email_text.to_string(),
3870                    trace_id: self.trace_id.clone(),
3871                    trace_context: self.trace_context.clone(),
3872                    trace_sampled: self.trace_sampled,
3873                };
3874
3875                let llm_result = self
3876                    .llm_executor
3877                    .complete_structured(request, None)
3878                    .await
3879                    .map_err(ToolExecutionError::Failed);
3880
3881                if let Ok(ref result) = llm_result {
3882                    if let Some(usage) = result.usage.as_ref() {
3883                        if let Ok(mut totals) = self.token_totals.lock() {
3884                            totals.add_usage(usage);
3885                        }
3886                        if let Ok(mut usage_map) = self.node_usage.lock() {
3887                            usage_map.insert(node_id_for_metrics.clone(), usage.clone());
3888                        }
3889                    }
3890                    if let Ok(mut model_map) = self.node_models.lock() {
3891                        model_map.insert(node_id_for_metrics, resolved_model);
3892                    }
3893                }
3894
3895                return llm_result.map(|result| result.payload);
3896            }
3897
3898            let worker = self
3899                .custom_worker
3900                .ok_or_else(|| ToolExecutionError::NotFound {
3901                    tool: input.tool.clone(),
3902                })?;
3903
3904            let payload = input.input.clone();
3905            let email_text = context
3906                .get("input")
3907                .and_then(|v| v.get("email_text"))
3908                .and_then(Value::as_str)
3909                .unwrap_or_default();
3910
3911            let tracer = workflow_tracer();
3912            let mut handler_span_context: Option<TraceContext> = None;
3913            let mut handler_span = if self.trace_sampled {
3914                let (span_context, mut span) = tracer.start_span(
3915                    "handler.invoke",
3916                    SpanKind::Node,
3917                    self.trace_context.as_ref(),
3918                );
3919                handler_span_context = Some(span_context);
3920                if let Some(trace_id) = self.trace_id.as_ref() {
3921                    span.set_attribute("trace_id", trace_id.as_str());
3922                }
3923                span.set_attribute("handler_name", input.tool.as_str());
3924                if let Some(workspace_id) = self.tenant_context.workspace_id.as_deref() {
3925                    span.set_attribute("tenant.workspace_id", workspace_id);
3926                }
3927                if let Some(user_id) = self.tenant_context.user_id.as_deref() {
3928                    span.set_attribute("tenant.user_id", user_id);
3929                }
3930                if let Some(conversation_id) = self.tenant_context.conversation_id.as_deref() {
3931                    span.set_attribute("tenant.conversation_id", conversation_id);
3932                }
3933                if let Some(request_id) = self.tenant_context.request_id.as_deref() {
3934                    span.set_attribute("tenant.request_id", request_id);
3935                }
3936                if let Some(run_id) = self.tenant_context.run_id.as_deref() {
3937                    span.set_attribute("tenant.run_id", run_id);
3938                }
3939                span.set_attribute(
3940                    "node_input",
3941                    payload_for_span(self.payload_mode, &payload).as_str(),
3942                );
3943                Some(span)
3944            } else {
3945                None
3946            };
3947
3948            let trace_options = YamlWorkflowRunOptions {
3949                telemetry: YamlWorkflowTelemetryConfig::default(),
3950                trace: YamlWorkflowTraceOptions {
3951                    context: self.trace_input_context.clone(),
3952                    tenant: self.tenant_context.clone(),
3953                },
3954                model: None,
3955            };
3956            let worker_trace_context = merged_trace_context_for_worker(
3957                handler_span_context.as_ref(),
3958                self.trace_id.as_deref(),
3959                &trace_options,
3960            );
3961            let worker_context = custom_worker_context_with_trace(
3962                &context,
3963                &worker_trace_context,
3964                &self.tenant_context,
3965            );
3966
3967            let output_result = worker
3968                .execute(&input.tool, &payload, email_text, &worker_context)
3969                .await
3970                .map_err(ToolExecutionError::Failed);
3971
3972            if let Some(span) = handler_span.as_mut() {
3973                if output_result.is_ok() {
3974                    span.add_event("handler.success");
3975                } else {
3976                    span.add_event("handler.error");
3977                }
3978            }
3979
3980            if let Some(span) = handler_span.take() {
3981                span.end();
3982            }
3983
3984            output_result
3985        }
3986    }
3987
3988    let tool_executor = YamlIrToolExecutor {
3989        llm_executor: executor,
3990        custom_worker,
3991        token_totals: std::sync::Mutex::new(YamlTokenTotals::default()),
3992        node_usage: std::sync::Mutex::new(BTreeMap::new()),
3993        node_models: std::sync::Mutex::new(BTreeMap::new()),
3994        model_override: options.model.clone(),
3995        trace_id: telemetry_context.trace_id.clone(),
3996        trace_context: workflow_span_context.clone(),
3997        trace_input_context: options.trace.context.clone(),
3998        tenant_context: options.trace.tenant.clone(),
3999        payload_mode: options.telemetry.payload_mode,
4000        trace_sampled: telemetry_context.sampled,
4001    };
4002
4003    let runtime = WorkflowRuntime::new(
4004        ir,
4005        &NoopLlm,
4006        Some(&tool_executor),
4007        WorkflowRuntimeOptions::default(),
4008    );
4009
4010    let started = Instant::now();
4011    let result = match runtime.execute(workflow_input.clone(), None).await {
4012        Ok(result) => result,
4013        Err(WorkflowRuntimeError::Validation(_)) => return Ok(None),
4014        Err(error) => {
4015            return Err(YamlWorkflowRunError::IrRuntime {
4016                message: error.to_string(),
4017            });
4018        }
4019    };
4020    let total_elapsed_ms = started.elapsed().as_millis();
4021
4022    let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
4023    for (node_id, output) in result.node_outputs {
4024        if node_id == YAML_START_NODE_ID {
4025            continue;
4026        }
4027        outputs.insert(node_id, json!({"output": output}));
4028    }
4029
4030    let mut trace = Vec::new();
4031    let mut step_timings = Vec::new();
4032    let node_usage_map = tool_executor
4033        .node_usage
4034        .lock()
4035        .map(|usage| usage.clone())
4036        .unwrap_or_default();
4037    let llm_node_models = tool_executor
4038        .node_models
4039        .lock()
4040        .map(|models| models.clone())
4041        .unwrap_or_default();
4042    let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
4043    for execution in result.node_executions {
4044        if execution.node_id == YAML_START_NODE_ID {
4045            continue;
4046        }
4047        let node_id = execution.node_id;
4048        trace.push(node_id.clone());
4049        let usage = node_usage_map.get(&node_id);
4050        if let Some(usage) = usage {
4051            llm_node_metrics.insert(
4052                node_id.clone(),
4053                YamlLlmNodeMetrics {
4054                    elapsed_ms: 0,
4055                    prompt_tokens: usage.prompt_tokens,
4056                    completion_tokens: usage.completion_tokens,
4057                    total_tokens: usage.total_tokens,
4058                    reasoning_tokens: usage.reasoning_tokens,
4059                    tokens_per_second: completion_tokens_per_second(usage.completion_tokens, 0),
4060                },
4061            );
4062        }
4063        step_timings.push(YamlStepTiming {
4064            node_id: node_id.clone(),
4065            node_kind: "ir_runtime".to_string(),
4066            model_name: llm_node_models.get(&node_id).cloned(),
4067            elapsed_ms: 0,
4068            prompt_tokens: usage.map(|value| value.prompt_tokens),
4069            completion_tokens: usage.map(|value| value.completion_tokens),
4070            total_tokens: usage.map(|value| value.total_tokens),
4071            reasoning_tokens: usage.and_then(|value| value.reasoning_tokens),
4072            tokens_per_second: usage
4073                .map(|value| completion_tokens_per_second(value.completion_tokens, 0)),
4074        });
4075    }
4076
4077    let terminal_node = result.terminal_node_id;
4078    let terminal_output = outputs
4079        .get(&terminal_node)
4080        .and_then(|v| v.get("output"))
4081        .cloned();
4082
4083    let email_text = workflow_input
4084        .get("email_text")
4085        .and_then(Value::as_str)
4086        .unwrap_or_default()
4087        .to_string();
4088
4089    let token_totals = tool_executor
4090        .token_totals
4091        .lock()
4092        .map(|totals| totals.clone())
4093        .unwrap_or_default();
4094
4095    if let Some(mut span) = workflow_span.take() {
4096        span.set_attribute("workflow_id", workflow.id.as_str());
4097        if let Some(trace_id_value) = telemetry_context.trace_id.as_ref() {
4098            span.set_attribute("trace_id", trace_id_value.as_str());
4099        }
4100        span.end();
4101        flush_workflow_tracer();
4102    }
4103
4104    Ok(Some(YamlWorkflowRunOutput {
4105        workflow_id: workflow.id.clone(),
4106        entry_node: workflow.entry_node.clone(),
4107        email_text,
4108        trace,
4109        outputs,
4110        terminal_node,
4111        terminal_output,
4112        step_timings,
4113        llm_node_metrics,
4114        llm_node_models,
4115        total_elapsed_ms,
4116        ttft_ms: None,
4117        total_input_tokens: token_totals.input_tokens,
4118        total_output_tokens: token_totals.output_tokens,
4119        total_tokens: token_totals.total_tokens,
4120        total_reasoning_tokens: token_totals.reasoning_tokens,
4121        tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
4122        trace_id: telemetry_context.trace_id.clone(),
4123        metadata: telemetry_context.trace_id.as_ref().map(|value| {
4124            workflow_metadata_with_trace(
4125                options,
4126                value,
4127                telemetry_context.sampled,
4128                telemetry_context.trace_id_source,
4129            )
4130        }),
4131    }))
4132}
4133
4134fn build_yaml_context_from_ir_scope(scoped_input: &Value) -> Value {
4135    let input = scoped_input.get("input").cloned().unwrap_or(Value::Null);
4136
4137    let mut nodes = serde_json::Map::new();
4138    if let Some(node_outputs) = scoped_input.get("node_outputs").and_then(Value::as_object) {
4139        for (node_id, output) in node_outputs {
4140            nodes.insert(node_id.clone(), json!({"output": output.clone()}));
4141        }
4142    }
4143
4144    json!({
4145        "input": input,
4146        "nodes": Value::Object(nodes),
4147        "globals": Value::Object(serde_json::Map::new())
4148    })
4149}
4150
4151pub async fn run_email_workflow_yaml_with_custom_worker_and_events(
4152    workflow: &YamlWorkflow,
4153    email_text: &str,
4154    executor: &dyn YamlWorkflowLlmExecutor,
4155    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
4156    event_sink: Option<&dyn YamlWorkflowEventSink>,
4157) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
4158    let workflow_input = json!({ "email_text": email_text });
4159    run_workflow_yaml_with_custom_worker_and_events(
4160        workflow,
4161        &workflow_input,
4162        executor,
4163        custom_worker,
4164        event_sink,
4165    )
4166    .await
4167}
4168
4169fn evaluate_switch_condition(
4170    condition: &str,
4171    context: &Value,
4172) -> Result<bool, YamlWorkflowRunError> {
4173    let (left, right) =
4174        condition
4175            .split_once("==")
4176            .ok_or_else(|| YamlWorkflowRunError::UnsupportedCondition {
4177                condition: condition.to_string(),
4178            })?;
4179
4180    let left_path = left.trim().trim_start_matches("$.");
4181    let right_literal = right.trim().trim_matches('"').trim_matches('\'');
4182    let left_value = resolve_path(context, left_path);
4183    Ok(left_value
4184        .and_then(Value::as_str)
4185        .map(|value| value == right_literal)
4186        .unwrap_or(false))
4187}
4188
4189fn parse_messages_from_context(path: &str, context: &Value) -> Result<Vec<Message>, String> {
4190    let normalized_path = path.trim().trim_start_matches("$.");
4191    let value = resolve_path(context, normalized_path)
4192        .ok_or_else(|| format!("messages_path not found: {path}"))?;
4193    let list: Vec<WorkflowMessage> = serde_json::from_value(value.clone()).map_err(|err| {
4194        format!("messages_path must resolve to a list of messages: {path}; {err}")
4195    })?;
4196    if list.is_empty() {
4197        return Err(format!(
4198            "messages_path must not resolve to an empty list: {path}"
4199        ));
4200    }
4201
4202    let mut messages = Vec::with_capacity(list.len());
4203    for (index, item) in list.into_iter().enumerate() {
4204        let mut message = match item.role {
4205            Role::System => Message::system(item.content),
4206            Role::User => Message::user(item.content),
4207            Role::Assistant => Message::assistant(item.content),
4208            Role::Tool => {
4209                let tool_call_id = item
4210                    .tool_call_id
4211                    .ok_or_else(|| format!("tool message at index {index} missing tool_call_id"))?;
4212                Message::tool(item.content, tool_call_id)
4213            }
4214        };
4215
4216        if let Some(name) = item.name {
4217            message = message.with_name(name);
4218        }
4219
4220        messages.push(message);
4221    }
4222
4223    Ok(messages)
4224}
4225
4226pub fn verify_yaml_workflow(workflow: &YamlWorkflow) -> Vec<YamlWorkflowDiagnostic> {
4227    let mut diagnostics = Vec::new();
4228    let known_ids: HashMap<&str, &YamlNode> = workflow
4229        .nodes
4230        .iter()
4231        .map(|node| (node.id.as_str(), node))
4232        .collect();
4233
4234    if !known_ids.contains_key(workflow.entry_node.as_str()) {
4235        diagnostics.push(YamlWorkflowDiagnostic {
4236            node_id: None,
4237            code: "missing_entry".to_string(),
4238            severity: YamlWorkflowDiagnosticSeverity::Error,
4239            message: format!("entry node '{}' does not exist", workflow.entry_node),
4240        });
4241    }
4242
4243    for edge in &workflow.edges {
4244        if !known_ids.contains_key(edge.from.as_str()) {
4245            diagnostics.push(YamlWorkflowDiagnostic {
4246                node_id: Some(edge.from.clone()),
4247                code: "unknown_edge_from".to_string(),
4248                severity: YamlWorkflowDiagnosticSeverity::Error,
4249                message: format!("edge.from '{}' does not exist", edge.from),
4250            });
4251        }
4252        if !known_ids.contains_key(edge.to.as_str()) {
4253            diagnostics.push(YamlWorkflowDiagnostic {
4254                node_id: Some(edge.to.clone()),
4255                code: "unknown_edge_to".to_string(),
4256                severity: YamlWorkflowDiagnosticSeverity::Error,
4257                message: format!("edge.to '{}' does not exist", edge.to),
4258            });
4259        }
4260    }
4261
4262    for node in &workflow.nodes {
4263        if let Some(llm) = &node.node_type.llm_call {
4264            if llm.model.trim().is_empty() {
4265                diagnostics.push(YamlWorkflowDiagnostic {
4266                    node_id: Some(node.id.clone()),
4267                    code: "empty_model".to_string(),
4268                    severity: YamlWorkflowDiagnosticSeverity::Error,
4269                    message: "llm_call.model must not be empty".to_string(),
4270                });
4271            }
4272            if llm.stream.unwrap_or(false) && llm.heal.unwrap_or(false) {
4273                diagnostics.push(YamlWorkflowDiagnostic {
4274                    node_id: Some(node.id.clone()),
4275                    code: "stream_heal_conflict".to_string(),
4276                    severity: YamlWorkflowDiagnosticSeverity::Warning,
4277                    message:
4278                        "llm_call.stream=true with heal=true is not streamable; runtime will disable streaming"
4279                            .to_string(),
4280                });
4281            }
4282
4283            if llm.max_tool_roundtrips.unwrap_or(1) == 0 {
4284                diagnostics.push(YamlWorkflowDiagnostic {
4285                    node_id: Some(node.id.clone()),
4286                    code: "invalid_max_tool_roundtrips".to_string(),
4287                    severity: YamlWorkflowDiagnosticSeverity::Error,
4288                    message: "llm_call.max_tool_roundtrips must be >= 1".to_string(),
4289                });
4290            }
4291
4292            if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
4293                if global_key.trim().is_empty() {
4294                    diagnostics.push(YamlWorkflowDiagnostic {
4295                        node_id: Some(node.id.clone()),
4296                        code: "empty_tool_calls_global_key".to_string(),
4297                        severity: YamlWorkflowDiagnosticSeverity::Error,
4298                        message: "llm_call.tool_calls_global_key must not be empty".to_string(),
4299                    });
4300                }
4301            }
4302
4303            match normalize_tool_choice(llm.tool_choice.clone()) {
4304                Ok(choice) => {
4305                    if let Some(ToolChoice::Tool(choice_tool)) = choice.as_ref() {
4306                        if !llm.tools.iter().any(|tool| match (llm.tools_format, tool) {
4307                            (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
4308                                openai.function.name == choice_tool.function.name
4309                            }
4310                            (
4311                                YamlToolFormat::Simplified,
4312                                YamlToolDeclaration::Simplified(simple),
4313                            ) => simple.name == choice_tool.function.name,
4314                            _ => false,
4315                        }) {
4316                            diagnostics.push(YamlWorkflowDiagnostic {
4317                                node_id: Some(node.id.clone()),
4318                                code: "unknown_tool_choice_function".to_string(),
4319                                severity: YamlWorkflowDiagnosticSeverity::Error,
4320                                message: format!(
4321                                    "llm_call.tool_choice references unknown function '{}'",
4322                                    choice_tool.function.name
4323                                ),
4324                            });
4325                        }
4326                    }
4327                }
4328                Err(message) => {
4329                    diagnostics.push(YamlWorkflowDiagnostic {
4330                        node_id: Some(node.id.clone()),
4331                        code: "invalid_tool_choice".to_string(),
4332                        severity: YamlWorkflowDiagnosticSeverity::Error,
4333                        message,
4334                    });
4335                }
4336            }
4337
4338            let normalized_tools = match normalize_llm_tools(llm) {
4339                Ok(tools) => tools,
4340                Err(message) => {
4341                    diagnostics.push(YamlWorkflowDiagnostic {
4342                        node_id: Some(node.id.clone()),
4343                        code: "invalid_tools_format".to_string(),
4344                        severity: YamlWorkflowDiagnosticSeverity::Error,
4345                        message,
4346                    });
4347                    Vec::new()
4348                }
4349            };
4350
4351            let mut seen_tool_names = HashSet::new();
4352            for tool in &normalized_tools {
4353                let name = tool.definition.function.name.trim();
4354                if name.is_empty() {
4355                    diagnostics.push(YamlWorkflowDiagnostic {
4356                        node_id: Some(node.id.clone()),
4357                        code: "empty_tool_name".to_string(),
4358                        severity: YamlWorkflowDiagnosticSeverity::Error,
4359                        message: "tool function name must not be empty".to_string(),
4360                    });
4361                }
4362                if !seen_tool_names.insert(tool.definition.function.name.clone()) {
4363                    diagnostics.push(YamlWorkflowDiagnostic {
4364                        node_id: Some(node.id.clone()),
4365                        code: "duplicate_tool_name".to_string(),
4366                        severity: YamlWorkflowDiagnosticSeverity::Error,
4367                        message: format!(
4368                            "duplicate tool function name '{}' in node",
4369                            tool.definition.function.name
4370                        ),
4371                    });
4372                }
4373
4374                let schema = tool
4375                    .definition
4376                    .function
4377                    .parameters
4378                    .clone()
4379                    .unwrap_or(Value::Null);
4380                if schema.is_null() {
4381                    diagnostics.push(YamlWorkflowDiagnostic {
4382                        node_id: Some(node.id.clone()),
4383                        code: "missing_tool_input_schema".to_string(),
4384                        severity: YamlWorkflowDiagnosticSeverity::Error,
4385                        message: format!(
4386                            "tool '{}' is missing input schema",
4387                            tool.definition.function.name
4388                        ),
4389                    });
4390                } else if let Err(message) = validate_json_schema(&schema) {
4391                    diagnostics.push(YamlWorkflowDiagnostic {
4392                        node_id: Some(node.id.clone()),
4393                        code: "invalid_tool_input_schema".to_string(),
4394                        severity: YamlWorkflowDiagnosticSeverity::Error,
4395                        message: format!(
4396                            "tool '{}' has invalid input schema: {}",
4397                            tool.definition.function.name, message
4398                        ),
4399                    });
4400                }
4401
4402                if let Some(output_schema) = tool.output_schema.as_ref() {
4403                    if let Err(message) = validate_json_schema(output_schema) {
4404                        diagnostics.push(YamlWorkflowDiagnostic {
4405                            node_id: Some(node.id.clone()),
4406                            code: "invalid_tool_output_schema".to_string(),
4407                            severity: YamlWorkflowDiagnosticSeverity::Error,
4408                            message: format!(
4409                                "tool '{}' has invalid output schema: {}",
4410                                tool.definition.function.name, message
4411                            ),
4412                        });
4413                    }
4414                }
4415            }
4416        }
4417
4418        if let Some(switch) = &node.node_type.switch {
4419            for branch in &switch.branches {
4420                if !known_ids.contains_key(branch.target.as_str()) {
4421                    diagnostics.push(YamlWorkflowDiagnostic {
4422                        node_id: Some(node.id.clone()),
4423                        code: "unknown_switch_target".to_string(),
4424                        severity: YamlWorkflowDiagnosticSeverity::Error,
4425                        message: format!("switch branch target '{}' does not exist", branch.target),
4426                    });
4427                }
4428            }
4429            if !known_ids.contains_key(switch.default.as_str()) {
4430                diagnostics.push(YamlWorkflowDiagnostic {
4431                    node_id: Some(node.id.clone()),
4432                    code: "unknown_switch_default".to_string(),
4433                    severity: YamlWorkflowDiagnosticSeverity::Error,
4434                    message: format!("switch default target '{}' does not exist", switch.default),
4435                });
4436            }
4437        }
4438
4439        if let Some(config) = node.config.as_ref() {
4440            if let Some(update_globals) = config.update_globals.as_ref() {
4441                for (key, update) in update_globals {
4442                    let is_valid_op =
4443                        matches!(update.op.as_str(), "set" | "append" | "increment" | "merge");
4444                    if !is_valid_op {
4445                        diagnostics.push(YamlWorkflowDiagnostic {
4446                            node_id: Some(node.id.clone()),
4447                            code: "unknown_update_op".to_string(),
4448                            severity: YamlWorkflowDiagnosticSeverity::Error,
4449                            message: format!(
4450                                "update_globals key '{}' has unknown op '{}'; expected set|append|increment|merge",
4451                                key, update.op
4452                            ),
4453                        });
4454                    }
4455
4456                    if update.op != "increment" && update.from.is_none() {
4457                        diagnostics.push(YamlWorkflowDiagnostic {
4458                            node_id: Some(node.id.clone()),
4459                            code: "missing_update_from".to_string(),
4460                            severity: YamlWorkflowDiagnosticSeverity::Error,
4461                            message: format!(
4462                                "update_globals key '{}' with op '{}' requires 'from'",
4463                                key, update.op
4464                            ),
4465                        });
4466                    }
4467                }
4468            }
4469        }
4470    }
4471
4472    diagnostics
4473}
4474
4475fn resolve_path<'a>(value: &'a Value, path: &str) -> Option<&'a Value> {
4476    path.split('.')
4477        .filter(|segment| !segment.is_empty())
4478        .try_fold(value, |current, segment| {
4479            if let Ok(index) = segment.parse::<usize>() {
4480                return current.get(index);
4481            }
4482            current.get(segment)
4483        })
4484}
4485
4486fn interpolate_template(template: &str, context: &Value) -> String {
4487    let mut out = String::with_capacity(template.len());
4488    let mut rest = template;
4489
4490    loop {
4491        let Some(start) = rest.find("{{") else {
4492            out.push_str(rest);
4493            break;
4494        };
4495
4496        out.push_str(&rest[..start]);
4497        let after_start = &rest[start + 2..];
4498        let Some(end) = after_start.find("}}") else {
4499            out.push_str(&rest[start..]);
4500            break;
4501        };
4502
4503        let expr = after_start[..end].trim();
4504        let source_path = expr.trim_start_matches("$.");
4505        let replacement = resolve_path(context, source_path)
4506            .map(value_to_template_string)
4507            .unwrap_or_default();
4508        out.push_str(replacement.as_str());
4509
4510        rest = &after_start[end + 2..];
4511    }
4512
4513    out
4514}
4515
4516fn collect_template_bindings(template: &str, context: &Value) -> Vec<YamlTemplateBinding> {
4517    let mut bindings = Vec::new();
4518    let mut rest = template;
4519
4520    loop {
4521        let Some(start) = rest.find("{{") else {
4522            break;
4523        };
4524
4525        let after_start = &rest[start + 2..];
4526        let Some(end) = after_start.find("}}") else {
4527            break;
4528        };
4529
4530        let expr = after_start[..end].trim();
4531        let source_path = expr.trim_start_matches("$.").to_string();
4532        let resolved = resolve_path(context, source_path.as_str()).cloned();
4533        let missing = resolved.is_none();
4534        let resolved_value = resolved.unwrap_or(Value::Null);
4535        bindings.push(YamlTemplateBinding {
4536            index: bindings.len(),
4537            expression: expr.to_string(),
4538            source_path,
4539            resolved_type: json_type_name(&resolved_value).to_string(),
4540            missing,
4541            resolved: resolved_value,
4542        });
4543
4544        rest = &after_start[end + 2..];
4545    }
4546
4547    bindings
4548}
4549
4550fn json_type_name(value: &Value) -> &'static str {
4551    match value {
4552        Value::Null => "null",
4553        Value::Bool(_) => "bool",
4554        Value::Number(_) => "number",
4555        Value::String(_) => "string",
4556        Value::Array(_) => "array",
4557        Value::Object(_) => "object",
4558    }
4559}
4560
4561fn value_to_template_string(value: &Value) -> String {
4562    match value {
4563        Value::Null => String::new(),
4564        Value::Bool(v) => v.to_string(),
4565        Value::Number(v) => v.to_string(),
4566        Value::String(v) => v.clone(),
4567        Value::Array(_) | Value::Object(_) => serde_json::to_string(value).unwrap_or_default(),
4568    }
4569}
4570
4571fn apply_set_globals(
4572    node: &YamlNode,
4573    outputs: &BTreeMap<String, Value>,
4574    workflow_input: &Value,
4575    globals: &mut serde_json::Map<String, Value>,
4576) {
4577    let Some(config) = node.config.as_ref() else {
4578        return;
4579    };
4580    let Some(set_globals) = config.set_globals.as_ref() else {
4581        return;
4582    };
4583
4584    let context = json!({
4585        "input": workflow_input,
4586        "nodes": outputs,
4587        "globals": Value::Object(globals.clone())
4588    });
4589
4590    for (key, expr) in set_globals {
4591        let value = resolve_path(&context, expr.as_str())
4592            .cloned()
4593            .unwrap_or(Value::Null);
4594        globals.insert(key.clone(), value);
4595    }
4596}
4597
4598fn apply_update_globals(
4599    node: &YamlNode,
4600    outputs: &BTreeMap<String, Value>,
4601    workflow_input: &Value,
4602    globals: &mut serde_json::Map<String, Value>,
4603) {
4604    let Some(config) = node.config.as_ref() else {
4605        return;
4606    };
4607    let Some(update_globals) = config.update_globals.as_ref() else {
4608        return;
4609    };
4610
4611    let context = json!({
4612        "input": workflow_input,
4613        "nodes": outputs,
4614        "globals": Value::Object(globals.clone())
4615    });
4616
4617    for (key, update) in update_globals {
4618        match update.op.as_str() {
4619            "set" => {
4620                if let Some(path) = update.from.as_ref() {
4621                    let value = resolve_path(&context, path.as_str())
4622                        .cloned()
4623                        .unwrap_or(Value::Null);
4624                    globals.insert(key.clone(), value);
4625                }
4626            }
4627            "append" => {
4628                if let Some(path) = update.from.as_ref() {
4629                    let value = resolve_path(&context, path.as_str())
4630                        .cloned()
4631                        .unwrap_or(Value::Null);
4632                    let entry = globals
4633                        .entry(key.clone())
4634                        .or_insert_with(|| Value::Array(Vec::new()));
4635                    match entry {
4636                        Value::Array(items) => items.push(value),
4637                        other => {
4638                            let existing = other.clone();
4639                            *other = Value::Array(vec![existing, value]);
4640                        }
4641                    }
4642                }
4643            }
4644            "increment" => {
4645                let by = update.by.unwrap_or(1.0);
4646                let current = globals
4647                    .get(key.as_str())
4648                    .and_then(Value::as_f64)
4649                    .unwrap_or(0.0);
4650                if let Some(next) = serde_json::Number::from_f64(current + by) {
4651                    globals.insert(key.clone(), Value::Number(next));
4652                }
4653            }
4654            "merge" => {
4655                if let Some(path) = update.from.as_ref() {
4656                    let source = resolve_path(&context, path.as_str())
4657                        .cloned()
4658                        .unwrap_or(Value::Null);
4659                    if let Value::Object(source_map) = source {
4660                        let target = globals
4661                            .entry(key.clone())
4662                            .or_insert_with(|| Value::Object(serde_json::Map::new()));
4663                        if let Value::Object(target_map) = target {
4664                            target_map.extend(source_map);
4665                        } else {
4666                            *target = Value::Object(source_map);
4667                        }
4668                    }
4669                }
4670            }
4671            _ => {}
4672        }
4673    }
4674}
4675
4676fn llm_output_schema_for_node(node: &YamlNode) -> Value {
4677    if let Some(schema) = node
4678        .config
4679        .as_ref()
4680        .and_then(|cfg| cfg.output_schema.clone())
4681    {
4682        return schema;
4683    }
4684
4685    default_llm_output_schema()
4686}
4687
4688fn normalize_tool_choice(
4689    config: Option<YamlToolChoiceConfig>,
4690) -> Result<Option<ToolChoice>, String> {
4691    let Some(config) = config else {
4692        return Ok(None);
4693    };
4694
4695    let choice = match config {
4696        YamlToolChoiceConfig::Mode(mode) => ToolChoice::Mode(mode),
4697        YamlToolChoiceConfig::Function { function } => ToolChoice::Tool(ToolChoiceTool {
4698            tool_type: ToolType::Function,
4699            function: ToolChoiceFunction { name: function },
4700        }),
4701        YamlToolChoiceConfig::OpenAi(tool) => ToolChoice::Tool(tool),
4702    };
4703
4704    Ok(Some(choice))
4705}
4706
4707fn normalize_llm_tools(llm: &YamlLlmCall) -> Result<Vec<YamlResolvedTool>, String> {
4708    llm.tools
4709        .iter()
4710        .map(|tool| match (llm.tools_format, tool) {
4711            (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
4712                let definition = ToolDefinition {
4713                    tool_type: openai.tool_type.unwrap_or(ToolType::Function),
4714                    function: ToolFunction {
4715                        name: openai.function.name.clone(),
4716                        description: openai.function.description.clone(),
4717                        parameters: openai.function.parameters.clone(),
4718                    },
4719                };
4720                Ok(YamlResolvedTool {
4721                    definition,
4722                    output_schema: openai.function.output_schema.clone(),
4723                })
4724            }
4725            (YamlToolFormat::Simplified, YamlToolDeclaration::Simplified(simple)) => {
4726                let definition = ToolDefinition {
4727                    tool_type: ToolType::Function,
4728                    function: ToolFunction {
4729                        name: simple.name.clone(),
4730                        description: simple.description.clone(),
4731                        parameters: Some(simple.input_schema.clone()),
4732                    },
4733                };
4734                Ok(YamlResolvedTool {
4735                    definition,
4736                    output_schema: simple.output_schema.clone(),
4737                })
4738            }
4739            (YamlToolFormat::Openai, _) => {
4740                Err("tools_format=openai requires OpenAI-style tool declarations".to_string())
4741            }
4742            (YamlToolFormat::Simplified, _) => {
4743                Err("tools_format=simplified requires simplified tool declarations".to_string())
4744            }
4745        })
4746        .collect()
4747}
4748
4749fn default_llm_output_schema() -> Value {
4750    json!({
4751        "type": "object",
4752        "additionalProperties": true
4753    })
4754}
4755
4756fn mock_rag(topic: &str) -> Value {
4757    let (kb_source, playbook) = match topic {
4758        "probation" => (
4759            "hr_policy/probation.md",
4760            "Collect manager review, performance evidence, and probation timeline.",
4761        ),
4762        "leave_request" => (
4763            "hr_policy/leave.md",
4764            "Validate leave balance, manager approval, and blackout dates.",
4765        ),
4766        "supply_chain_order_assessment" => (
4767            "supply_chain/order_assessment.md",
4768            "Review order specs, inventory risk, and vendor lead-time guidance.",
4769        ),
4770        "supply_chain_order_replacement" => (
4771            "supply_chain/order_replacement.md",
4772            "Collect order id, damage proof, and replacement SLA policy.",
4773        ),
4774        "termination_first_time_offense" => (
4775            "hr_policy/termination_first_offense.md",
4776            "Validate first-incident criteria and route to HRBP review.",
4777        ),
4778        "termination_repeated_offense" => (
4779            "hr_policy/termination_repeated_offense.md",
4780            "Collect prior warnings and escalation approvals before final action.",
4781        ),
4782        _ => (
4783            "shared/request_clarification.md",
4784            "Request clarifying details before routing.",
4785        ),
4786    };
4787
4788    json!({
4789        "kb_source": kb_source,
4790        "playbook": playbook,
4791    })
4792}
4793
4794async fn execute_subworkflow_tool_call(
4795    payload: &Value,
4796    context: &Value,
4797    client: &SimpleAgentsClient,
4798    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
4799    parent_options: &YamlWorkflowRunOptions,
4800    parent_trace_context: Option<&TraceContext>,
4801    resolved_trace_id: Option<&str>,
4802) -> Result<Value, String> {
4803    let workflow_id = payload
4804        .get("workflow_id")
4805        .and_then(Value::as_str)
4806        .ok_or_else(|| "run_workflow_graph requires payload.workflow_id".to_string())?;
4807
4808    let input_context = context
4809        .get("input")
4810        .and_then(Value::as_object)
4811        .ok_or_else(|| "run_workflow_graph requires context.input".to_string())?;
4812
4813    let registry = input_context
4814        .get("workflow_registry")
4815        .and_then(Value::as_object)
4816        .ok_or_else(|| {
4817            "run_workflow_graph requires input.workflow_registry map of workflow_id -> yaml_path"
4818                .to_string()
4819        })?;
4820
4821    let workflow_path = registry
4822        .get(workflow_id)
4823        .and_then(Value::as_str)
4824        .ok_or_else(|| {
4825            format!(
4826                "workflow_registry has no entry for workflow_id '{}'",
4827                workflow_id
4828            )
4829        })?;
4830
4831    let parent_depth = input_context
4832        .get("__subgraph_depth")
4833        .and_then(Value::as_u64)
4834        .unwrap_or(0);
4835    let max_depth = input_context
4836        .get("__subgraph_max_depth")
4837        .and_then(Value::as_u64)
4838        .unwrap_or(3);
4839
4840    if parent_depth >= max_depth {
4841        return Err(format!(
4842            "run_workflow_graph depth limit reached (depth={}, max={})",
4843            parent_depth, max_depth
4844        ));
4845    }
4846
4847    let mut subworkflow_input = payload
4848        .get("input")
4849        .and_then(Value::as_object)
4850        .cloned()
4851        .unwrap_or_default();
4852
4853    if !subworkflow_input.contains_key("messages") {
4854        if let Some(messages) = input_context.get("messages") {
4855            subworkflow_input.insert("messages".to_string(), messages.clone());
4856        }
4857    }
4858
4859    if !subworkflow_input.contains_key("email_text") {
4860        if let Some(email_text) = input_context.get("email_text") {
4861            subworkflow_input.insert("email_text".to_string(), email_text.clone());
4862        }
4863    }
4864
4865    if !subworkflow_input.contains_key("workflow_registry") {
4866        subworkflow_input.insert(
4867            "workflow_registry".to_string(),
4868            Value::Object(registry.clone()),
4869        );
4870    }
4871
4872    subworkflow_input.insert(
4873        "__subgraph_depth".to_string(),
4874        Value::Number(serde_json::Number::from(parent_depth + 1)),
4875    );
4876    subworkflow_input.insert(
4877        "__subgraph_max_depth".to_string(),
4878        Value::Number(serde_json::Number::from(max_depth)),
4879    );
4880
4881    let subworkflow_options =
4882        build_subworkflow_options(parent_options, parent_trace_context, resolved_trace_id);
4883
4884    let output = run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
4885        Path::new(workflow_path),
4886        &Value::Object(subworkflow_input),
4887        client,
4888        custom_worker,
4889        None,
4890        &subworkflow_options,
4891    )
4892    .await
4893    .map_err(|error| format!("subworkflow '{}' failed: {}", workflow_id, error))?;
4894
4895    Ok(json!({
4896        "workflow_id": workflow_id,
4897        "workflow_path": workflow_path,
4898        "terminal_node": output.terminal_node,
4899        "terminal_output": output.terminal_output,
4900        "trace": output.trace,
4901    }))
4902}
4903
4904fn build_subworkflow_options(
4905    parent_options: &YamlWorkflowRunOptions,
4906    parent_trace_context: Option<&TraceContext>,
4907    resolved_trace_id: Option<&str>,
4908) -> YamlWorkflowRunOptions {
4909    let mut subworkflow_options = parent_options.clone();
4910    if parent_trace_context.is_some() || resolved_trace_id.is_some() {
4911        let trace_context = YamlWorkflowTraceContextInput {
4912            trace_id: resolved_trace_id
4913                .map(|value| value.to_string())
4914                .or_else(|| parent_trace_context.and_then(|ctx| ctx.trace_id.clone())),
4915            span_id: parent_trace_context.and_then(|ctx| ctx.span_id.clone()),
4916            parent_span_id: parent_trace_context.and_then(|ctx| ctx.parent_span_id.clone()),
4917            traceparent: parent_trace_context.and_then(|ctx| ctx.traceparent.clone()),
4918            tracestate: parent_trace_context.and_then(|ctx| ctx.tracestate.clone()),
4919            baggage: parent_trace_context
4920                .map(|ctx| ctx.baggage.clone())
4921                .unwrap_or_default(),
4922        };
4923        subworkflow_options.trace.context = Some(trace_context);
4924    }
4925    subworkflow_options
4926}
4927
4928fn mock_custom_worker_output(
4929    handler: &str,
4930    payload: &Value,
4931) -> Result<Value, YamlWorkflowRunError> {
4932    if handler == "get_employee_record" {
4933        let employee_name = payload
4934            .get("employee_name")
4935            .and_then(Value::as_str)
4936            .unwrap_or("Unknown Employee")
4937            .trim();
4938        let normalized_name = if employee_name.is_empty() {
4939            "Unknown Employee"
4940        } else {
4941            employee_name
4942        };
4943
4944        let (employee_id, location) = match normalized_name.to_ascii_lowercase().as_str() {
4945            "alex johnson" => ("EMP-2041", "Austin"),
4946            "priya sharma" => ("EMP-3378", "Bengaluru"),
4947            "marcus lee" => ("EMP-1196", "Singapore"),
4948            "sarah chen" => ("EMP-4450", "Toronto"),
4949            _ => ("EMP-0000", "Unassigned"),
4950        };
4951
4952        return Ok(json!({
4953            "employee_name": normalized_name,
4954            "employee_id": employee_id,
4955            "location": location,
4956        }));
4957    }
4958
4959    if let Some(topic) = payload.get("topic").and_then(Value::as_str) {
4960        let mut value = mock_rag(topic);
4961        if let Value::Object(object) = &mut value {
4962            object.insert("handler".to_string(), Value::String(handler.to_string()));
4963        }
4964        return Ok(value);
4965    }
4966
4967    Err(YamlWorkflowRunError::UnsupportedCustomHandler {
4968        handler: handler.to_string(),
4969    })
4970}
4971
4972#[derive(Debug, Clone, Deserialize)]
4973pub struct YamlWorkflow {
4974    pub id: String,
4975    pub entry_node: String,
4976    #[serde(default)]
4977    pub nodes: Vec<YamlNode>,
4978    #[serde(default)]
4979    pub edges: Vec<YamlEdge>,
4980}
4981
4982#[derive(Debug, Clone, Deserialize)]
4983pub struct YamlNode {
4984    pub id: String,
4985    pub node_type: YamlNodeType,
4986    pub config: Option<YamlNodeConfig>,
4987}
4988
4989impl YamlNode {
4990    fn kind_name(&self) -> &'static str {
4991        if self.node_type.llm_call.is_some() {
4992            "llm_call"
4993        } else if self.node_type.switch.is_some() {
4994            "switch"
4995        } else if self.node_type.custom_worker.is_some() {
4996            "custom_worker"
4997        } else {
4998            "unknown"
4999        }
5000    }
5001}
5002
5003#[derive(Debug, Clone, Deserialize)]
5004pub struct YamlNodeType {
5005    pub llm_call: Option<YamlLlmCall>,
5006    pub switch: Option<YamlSwitch>,
5007    pub custom_worker: Option<YamlCustomWorker>,
5008}
5009
5010#[derive(Debug, Clone, Deserialize)]
5011pub struct YamlLlmCall {
5012    pub model: String,
5013    pub stream: Option<bool>,
5014    pub stream_json_as_text: Option<bool>,
5015    pub heal: Option<bool>,
5016    pub messages_path: Option<String>,
5017    pub append_prompt_as_user: Option<bool>,
5018    #[serde(default)]
5019    pub tools_format: YamlToolFormat,
5020    #[serde(default)]
5021    pub tools: Vec<YamlToolDeclaration>,
5022    pub tool_choice: Option<YamlToolChoiceConfig>,
5023    pub max_tool_roundtrips: Option<u8>,
5024    pub tool_calls_global_key: Option<String>,
5025}
5026
5027#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
5028#[serde(rename_all = "snake_case")]
5029pub enum YamlToolFormat {
5030    #[default]
5031    Openai,
5032    Simplified,
5033}
5034
5035#[derive(Debug, Clone, Deserialize)]
5036#[serde(untagged)]
5037pub enum YamlToolDeclaration {
5038    OpenAi(YamlOpenAiToolDeclaration),
5039    Simplified(YamlSimplifiedToolDeclaration),
5040}
5041
5042#[derive(Debug, Clone, Deserialize)]
5043pub struct YamlOpenAiToolDeclaration {
5044    #[serde(rename = "type")]
5045    pub tool_type: Option<ToolType>,
5046    pub function: YamlOpenAiToolFunction,
5047}
5048
5049#[derive(Debug, Clone, Deserialize)]
5050pub struct YamlOpenAiToolFunction {
5051    pub name: String,
5052    pub description: Option<String>,
5053    pub parameters: Option<Value>,
5054    pub output_schema: Option<Value>,
5055}
5056
5057#[derive(Debug, Clone, Deserialize)]
5058pub struct YamlSimplifiedToolDeclaration {
5059    pub name: String,
5060    pub description: Option<String>,
5061    pub input_schema: Value,
5062    pub output_schema: Option<Value>,
5063}
5064
5065#[derive(Debug, Clone, Deserialize)]
5066#[serde(untagged)]
5067pub enum YamlToolChoiceConfig {
5068    Mode(ToolChoiceMode),
5069    Function { function: String },
5070    OpenAi(ToolChoiceTool),
5071}
5072
5073#[derive(Debug, Clone, Deserialize)]
5074pub struct YamlSwitch {
5075    #[serde(default)]
5076    pub branches: Vec<YamlSwitchBranch>,
5077    pub default: String,
5078}
5079
5080#[derive(Debug, Clone, Deserialize)]
5081pub struct YamlSwitchBranch {
5082    pub condition: String,
5083    pub target: String,
5084}
5085
5086#[derive(Debug, Clone, Deserialize)]
5087pub struct YamlCustomWorker {
5088    pub handler: String,
5089}
5090
5091#[derive(Debug, Clone, Deserialize)]
5092pub struct YamlNodeConfig {
5093    pub prompt: Option<String>,
5094    #[serde(default, alias = "schema")]
5095    pub output_schema: Option<Value>,
5096    pub payload: Option<Value>,
5097    pub set_globals: Option<HashMap<String, String>>,
5098    pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
5099}
5100
5101#[derive(Debug, Clone, Deserialize)]
5102pub struct YamlGlobalUpdate {
5103    pub op: String,
5104    pub from: Option<String>,
5105    pub by: Option<f64>,
5106}
5107
5108#[derive(Debug, Clone, Deserialize)]
5109pub struct YamlEdge {
5110    pub from: String,
5111    pub to: String,
5112}
5113
5114#[cfg(test)]
5115mod tests {
5116    use super::*;
5117    use simple_agent_type::provider::{Provider, ProviderRequest, ProviderResponse};
5118    use simple_agent_type::response::{CompletionChoice, CompletionResponse, Usage};
5119    use simple_agent_type::tool::{ToolCallFunction, ToolType};
5120    use simple_agent_type::{Result as SaResult, SimpleAgentsError};
5121    use simple_agents_core::SimpleAgentsClientBuilder;
5122    use std::fs;
5123    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5124    use std::sync::{Arc, Mutex, OnceLock};
5125
5126    fn stream_debug_env_lock() -> &'static Mutex<()> {
5127        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
5128        LOCK.get_or_init(|| Mutex::new(()))
5129    }
5130
5131    struct MockExecutor;
5132
5133    struct RecordingSink {
5134        events: Mutex<Vec<YamlWorkflowEvent>>,
5135    }
5136
5137    struct CancelAfterFirstEventSink {
5138        cancelled: AtomicBool,
5139    }
5140
5141    impl YamlWorkflowEventSink for CancelAfterFirstEventSink {
5142        fn emit(&self, _event: &YamlWorkflowEvent) {
5143            self.cancelled.store(true, Ordering::SeqCst);
5144        }
5145
5146        fn is_cancelled(&self) -> bool {
5147            self.cancelled.load(Ordering::SeqCst)
5148        }
5149    }
5150
5151    struct CountingExecutor {
5152        call_count: AtomicUsize,
5153    }
5154
5155    struct CapturingWorker {
5156        context: Mutex<Option<Value>>,
5157    }
5158
5159    struct ToolLoopProvider;
5160
5161    struct UnknownToolProvider;
5162
5163    struct ReasoningUsageProvider;
5164
5165    struct ToolLoopReasoningProvider;
5166
5167    #[async_trait]
5168    impl Provider for ToolLoopProvider {
5169        fn name(&self) -> &str {
5170            "openai"
5171        }
5172
5173        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5174            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5175            Ok(ProviderRequest::new("mock://tool-loop").with_body(body))
5176        }
5177
5178        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5179            let request: CompletionRequest =
5180                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5181
5182            let has_tools = request
5183                .tools
5184                .as_ref()
5185                .is_some_and(|tools| !tools.is_empty());
5186            let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
5187
5188            let response = if has_tools && !has_tool_result {
5189                CompletionResponse {
5190                    id: "resp_tool_1".to_string(),
5191                    model: request.model.clone(),
5192                    choices: vec![CompletionChoice {
5193                        index: 0,
5194                        message: Message::assistant("").with_tool_calls(vec![ToolCall {
5195                            id: "call_get_context".to_string(),
5196                            tool_type: ToolType::Function,
5197                            function: ToolCallFunction {
5198                                name: "get_customer_context".to_string(),
5199                                arguments: "{\"order_id\":\"123\"}".to_string(),
5200                            },
5201                        }]),
5202                        finish_reason: FinishReason::ToolCalls,
5203                        logprobs: None,
5204                    }],
5205                    usage: Usage::new(10, 5),
5206                    created: None,
5207                    provider: Some(self.name().to_string()),
5208                    healing_metadata: None,
5209                }
5210            } else if has_tools && has_tool_result {
5211                CompletionResponse {
5212                    id: "resp_tool_2".to_string(),
5213                    model: request.model.clone(),
5214                    choices: vec![CompletionChoice {
5215                        index: 0,
5216                        message: Message::assistant("{\"state\":\"done\"}"),
5217                        finish_reason: FinishReason::Stop,
5218                        logprobs: None,
5219                    }],
5220                    usage: Usage::new(12, 6),
5221                    created: None,
5222                    provider: Some(self.name().to_string()),
5223                    healing_metadata: None,
5224                }
5225            } else {
5226                let prompt = request
5227                    .messages
5228                    .iter()
5229                    .rev()
5230                    .find(|m| m.role == Role::User)
5231                    .map(|m| m.content.clone())
5232                    .unwrap_or_default();
5233                let payload = json!({
5234                    "subject": "ok",
5235                    "body": prompt,
5236                })
5237                .to_string();
5238                CompletionResponse {
5239                    id: "resp_final".to_string(),
5240                    model: request.model.clone(),
5241                    choices: vec![CompletionChoice {
5242                        index: 0,
5243                        message: Message::assistant(payload),
5244                        finish_reason: FinishReason::Stop,
5245                        logprobs: None,
5246                    }],
5247                    usage: Usage::new(8, 4),
5248                    created: None,
5249                    provider: Some(self.name().to_string()),
5250                    healing_metadata: None,
5251                }
5252            };
5253
5254            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5255            Ok(ProviderResponse::new(200, body))
5256        }
5257
5258        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5259            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5260        }
5261    }
5262
5263    #[async_trait]
5264    impl Provider for UnknownToolProvider {
5265        fn name(&self) -> &str {
5266            "openai"
5267        }
5268
5269        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5270            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5271            Ok(ProviderRequest::new("mock://unknown-tool").with_body(body))
5272        }
5273
5274        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5275            let request: CompletionRequest =
5276                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5277
5278            let response = CompletionResponse {
5279                id: "resp_unknown_tool".to_string(),
5280                model: request.model,
5281                choices: vec![CompletionChoice {
5282                    index: 0,
5283                    message: Message::assistant("").with_tool_calls(vec![ToolCall {
5284                        id: "call_unknown".to_string(),
5285                        tool_type: ToolType::Function,
5286                        function: ToolCallFunction {
5287                            name: "unknown_tool".to_string(),
5288                            arguments: "{}".to_string(),
5289                        },
5290                    }]),
5291                    finish_reason: FinishReason::ToolCalls,
5292                    logprobs: None,
5293                }],
5294                usage: Usage::new(5, 2),
5295                created: None,
5296                provider: Some(self.name().to_string()),
5297                healing_metadata: None,
5298            };
5299
5300            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5301            Ok(ProviderResponse::new(200, body))
5302        }
5303
5304        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5305            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5306        }
5307    }
5308
5309    #[async_trait]
5310    impl Provider for ReasoningUsageProvider {
5311        fn name(&self) -> &str {
5312            "openai"
5313        }
5314
5315        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5316            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5317            Ok(ProviderRequest::new("mock://reasoning-usage").with_body(body))
5318        }
5319
5320        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5321            let request: CompletionRequest =
5322                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5323
5324            let mut usage = Usage::new(9, 5);
5325            usage.reasoning_tokens = Some(4);
5326            let response = CompletionResponse {
5327                id: "resp_reasoning".to_string(),
5328                model: request.model,
5329                choices: vec![CompletionChoice {
5330                    index: 0,
5331                    message: Message::assistant("{\"state\":\"ok\"}"),
5332                    finish_reason: FinishReason::Stop,
5333                    logprobs: None,
5334                }],
5335                usage,
5336                created: None,
5337                provider: Some(self.name().to_string()),
5338                healing_metadata: None,
5339            };
5340
5341            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5342            Ok(ProviderResponse::new(200, body))
5343        }
5344
5345        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5346            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5347        }
5348    }
5349
5350    #[async_trait]
5351    impl Provider for ToolLoopReasoningProvider {
5352        fn name(&self) -> &str {
5353            "openai"
5354        }
5355
5356        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5357            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5358            Ok(ProviderRequest::new("mock://tool-loop-reasoning").with_body(body))
5359        }
5360
5361        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5362            let request: CompletionRequest =
5363                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5364
5365            let has_tools = request
5366                .tools
5367                .as_ref()
5368                .is_some_and(|tools| !tools.is_empty());
5369            let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
5370
5371            let response = if has_tools && !has_tool_result {
5372                let mut usage = Usage::new(10, 5);
5373                usage.reasoning_tokens = Some(2);
5374                CompletionResponse {
5375                    id: "resp_tool_reasoning_1".to_string(),
5376                    model: request.model.clone(),
5377                    choices: vec![CompletionChoice {
5378                        index: 0,
5379                        message: Message::assistant("").with_tool_calls(vec![ToolCall {
5380                            id: "call_get_context".to_string(),
5381                            tool_type: ToolType::Function,
5382                            function: ToolCallFunction {
5383                                name: "get_customer_context".to_string(),
5384                                arguments: "{\"order_id\":\"123\"}".to_string(),
5385                            },
5386                        }]),
5387                        finish_reason: FinishReason::ToolCalls,
5388                        logprobs: None,
5389                    }],
5390                    usage,
5391                    created: None,
5392                    provider: Some(self.name().to_string()),
5393                    healing_metadata: None,
5394                }
5395            } else {
5396                let mut usage = Usage::new(12, 6);
5397                usage.reasoning_tokens = Some(3);
5398                CompletionResponse {
5399                    id: "resp_tool_reasoning_2".to_string(),
5400                    model: request.model,
5401                    choices: vec![CompletionChoice {
5402                        index: 0,
5403                        message: Message::assistant("{\"state\":\"done\"}"),
5404                        finish_reason: FinishReason::Stop,
5405                        logprobs: None,
5406                    }],
5407                    usage,
5408                    created: None,
5409                    provider: Some(self.name().to_string()),
5410                    healing_metadata: None,
5411                }
5412            };
5413
5414            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5415            Ok(ProviderResponse::new(200, body))
5416        }
5417
5418        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5419            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5420        }
5421    }
5422
5423    struct FixedToolWorker {
5424        payload: Value,
5425    }
5426
5427    struct CountingToolWorker {
5428        execute_calls: AtomicUsize,
5429    }
5430
5431    #[async_trait]
5432    impl YamlWorkflowCustomWorkerExecutor for FixedToolWorker {
5433        async fn execute(
5434            &self,
5435            _handler: &str,
5436            _payload: &Value,
5437            _email_text: &str,
5438            _context: &Value,
5439        ) -> Result<Value, String> {
5440            Ok(self.payload.clone())
5441        }
5442    }
5443
5444    #[async_trait]
5445    impl YamlWorkflowCustomWorkerExecutor for CountingToolWorker {
5446        async fn execute(
5447            &self,
5448            _handler: &str,
5449            _payload: &Value,
5450            _email_text: &str,
5451            _context: &Value,
5452        ) -> Result<Value, String> {
5453            self.execute_calls.fetch_add(1, Ordering::SeqCst);
5454            Ok(json!({"ok": true}))
5455        }
5456    }
5457
5458    #[async_trait]
5459    impl YamlWorkflowLlmExecutor for CountingExecutor {
5460        async fn complete_structured(
5461            &self,
5462            _request: YamlLlmExecutionRequest,
5463            _event_sink: Option<&dyn YamlWorkflowEventSink>,
5464        ) -> Result<YamlLlmExecutionResult, String> {
5465            self.call_count.fetch_add(1, Ordering::SeqCst);
5466            Ok(YamlLlmExecutionResult {
5467                payload: json!({"state":"ok"}),
5468                usage: None,
5469                ttft_ms: None,
5470                tool_calls: Vec::new(),
5471            })
5472        }
5473    }
5474
5475    impl YamlWorkflowEventSink for RecordingSink {
5476        fn emit(&self, event: &YamlWorkflowEvent) {
5477            self.events
5478                .lock()
5479                .expect("recording sink lock should not be poisoned")
5480                .push(event.clone());
5481        }
5482    }
5483
5484    #[async_trait]
5485    impl YamlWorkflowCustomWorkerExecutor for CapturingWorker {
5486        async fn execute(
5487            &self,
5488            _handler: &str,
5489            _payload: &Value,
5490            _email_text: &str,
5491            context: &Value,
5492        ) -> Result<Value, String> {
5493            let mut guard = self
5494                .context
5495                .lock()
5496                .map_err(|_| "capturing worker lock should not be poisoned".to_string())?;
5497            *guard = Some(context.clone());
5498            Ok(json!({"ok": true}))
5499        }
5500    }
5501
5502    #[async_trait]
5503    impl YamlWorkflowLlmExecutor for MockExecutor {
5504        async fn complete_structured(
5505            &self,
5506            request: YamlLlmExecutionRequest,
5507            _event_sink: Option<&dyn YamlWorkflowEventSink>,
5508        ) -> Result<YamlLlmExecutionResult, String> {
5509            let prompt = request.prompt;
5510            if prompt.contains("exactly one category") {
5511                return Ok(YamlLlmExecutionResult {
5512                    payload: json!({"category":"termination","reason":"mock"}),
5513                    usage: Some(YamlLlmTokenUsage {
5514                        prompt_tokens: 10,
5515                        completion_tokens: 5,
5516                        total_tokens: 15,
5517                        reasoning_tokens: None,
5518                    }),
5519                    ttft_ms: None,
5520                    tool_calls: Vec::new(),
5521                });
5522            }
5523            if prompt.contains("Determine termination subtype") {
5524                return Ok(YamlLlmExecutionResult {
5525                    payload: json!({"subtype":"repeated_offense","reason":"mock"}),
5526                    usage: Some(YamlLlmTokenUsage {
5527                        prompt_tokens: 12,
5528                        completion_tokens: 6,
5529                        total_tokens: 18,
5530                        reasoning_tokens: None,
5531                    }),
5532                    ttft_ms: None,
5533                    tool_calls: Vec::new(),
5534                });
5535            }
5536            if prompt.contains("Determine supply chain subtype") {
5537                return Ok(YamlLlmExecutionResult {
5538                    payload: json!({"subtype":"order_replacement","reason":"mock"}),
5539                    usage: Some(YamlLlmTokenUsage {
5540                        prompt_tokens: 11,
5541                        completion_tokens: 4,
5542                        total_tokens: 15,
5543                        reasoning_tokens: None,
5544                    }),
5545                    ttft_ms: None,
5546                    tool_calls: Vec::new(),
5547                });
5548            }
5549            Err("unexpected prompt".to_string())
5550        }
5551    }
5552
5553    #[tokio::test]
5554    async fn runs_yaml_workflow_and_returns_step_timings() {
5555        let yaml = r#"
5556id: email-intake-classification
5557entry_node: classify_top_level
5558nodes:
5559  - id: classify_top_level
5560    node_type:
5561      llm_call:
5562        model: gpt-4.1
5563    config:
5564      prompt: |
5565        Classify this email into exactly one category:
5566        {{ input.email_text }}
5567  - id: route_top_level
5568    node_type:
5569      switch:
5570        branches:
5571          - condition: '$.nodes.classify_top_level.output.category == "termination"'
5572            target: classify_termination_subtype
5573        default: rag_clarification
5574  - id: classify_termination_subtype
5575    node_type:
5576      llm_call:
5577        model: gpt-4.1
5578    config:
5579      prompt: |
5580        Determine termination subtype:
5581        {{ input.email_text }}
5582  - id: route_termination_subtype
5583    node_type:
5584      switch:
5585        branches:
5586          - condition: '$.nodes.classify_termination_subtype.output.subtype == "repeated_offense"'
5587            target: rag_termination_repeated_offense
5588        default: rag_clarification
5589  - id: rag_termination_repeated_offense
5590    node_type:
5591      custom_worker:
5592        handler: GetRagData
5593    config:
5594      payload:
5595        topic: termination_repeated_offense
5596  - id: rag_clarification
5597    node_type:
5598      custom_worker:
5599        handler: GetRagData
5600    config:
5601      payload:
5602        topic: clarification
5603edges:
5604  - from: classify_top_level
5605    to: route_top_level
5606  - from: classify_termination_subtype
5607    to: route_termination_subtype
5608"#;
5609
5610        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5611        let output = run_email_workflow_yaml(&workflow, "test", &MockExecutor)
5612            .await
5613            .expect("yaml workflow should execute");
5614
5615        assert_eq!(output.workflow_id, "email-intake-classification");
5616        assert_eq!(output.terminal_node, "rag_termination_repeated_offense");
5617        assert!(!output.step_timings.is_empty());
5618        assert_eq!(output.step_timings.len(), output.trace.len());
5619        assert!(output
5620            .outputs
5621            .contains_key("rag_termination_repeated_offense"));
5622        assert_eq!(output.total_input_tokens, 22);
5623        assert_eq!(output.total_output_tokens, 11);
5624        assert_eq!(output.total_tokens, 33);
5625    }
5626
5627    #[tokio::test]
5628    async fn emits_resolved_llm_input_event_with_bindings() {
5629        let yaml = r#"
5630id: email-intake-classification
5631entry_node: classify_top_level
5632nodes:
5633  - id: classify_top_level
5634    node_type:
5635      llm_call:
5636        model: gpt-4.1
5637    config:
5638      prompt: |
5639        Classify this email into exactly one category:
5640        {{ input.email_text }}
5641"#;
5642
5643        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5644        let sink = RecordingSink {
5645            events: Mutex::new(Vec::new()),
5646        };
5647
5648        let output = run_email_workflow_yaml_with_custom_worker_and_events(
5649            &workflow,
5650            "Need help with termination",
5651            &MockExecutor,
5652            None,
5653            Some(&sink),
5654        )
5655        .await
5656        .expect("yaml workflow should execute");
5657
5658        assert_eq!(output.terminal_node, "classify_top_level");
5659
5660        let events = sink
5661            .events
5662            .lock()
5663            .expect("recording sink lock should not be poisoned");
5664        let llm_event = events
5665            .iter()
5666            .find(|event| event.event_type == "node_llm_input_resolved")
5667            .expect("expected llm input telemetry event");
5668
5669        let metadata = llm_event
5670            .metadata
5671            .as_ref()
5672            .expect("llm input event must include metadata");
5673        assert_eq!(metadata["model"], Value::String("gpt-4.1".to_string()));
5674        assert_eq!(metadata["stream_requested"], Value::Bool(false));
5675        assert_eq!(metadata["heal_requested"], Value::Bool(false));
5676        assert!(metadata["prompt"]
5677            .as_str()
5678            .expect("prompt should be a string")
5679            .contains("Need help with termination"));
5680
5681        let bindings = metadata["bindings"]
5682            .as_array()
5683            .expect("bindings should be an array");
5684        assert_eq!(bindings.len(), 1);
5685        assert_eq!(
5686            bindings[0]["source_path"],
5687            Value::String("input.email_text".to_string())
5688        );
5689        assert_eq!(
5690            bindings[0]["resolved"],
5691            Value::String("Need help with termination".to_string())
5692        );
5693        assert_eq!(bindings[0]["missing"], Value::Bool(false));
5694        assert_eq!(
5695            bindings[0]["resolved_type"],
5696            Value::String("string".to_string())
5697        );
5698    }
5699
5700    #[tokio::test]
5701    async fn workflow_completed_event_includes_nerdstats_by_default() {
5702        let yaml = r#"
5703id: nerdstats-default
5704entry_node: classify
5705nodes:
5706  - id: classify
5707    node_type:
5708      llm_call:
5709        model: gpt-4.1
5710    config:
5711      prompt: |
5712        Classify this email into exactly one category:
5713        {{ input.email_text }}
5714"#;
5715
5716        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5717        let sink = RecordingSink {
5718            events: Mutex::new(Vec::new()),
5719        };
5720
5721        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5722            &workflow,
5723            &json!({"email_text":"hello"}),
5724            &MockExecutor,
5725            None,
5726            Some(&sink),
5727            &YamlWorkflowRunOptions::default(),
5728        )
5729        .await
5730        .expect("workflow should execute");
5731
5732        let events = sink
5733            .events
5734            .lock()
5735            .expect("recording sink lock should not be poisoned");
5736        let completed = events
5737            .iter()
5738            .find(|event| event.event_type == "workflow_completed")
5739            .expect("expected workflow_completed event");
5740        let metadata = completed
5741            .metadata
5742            .as_ref()
5743            .expect("workflow_completed should include metadata by default");
5744        let nerdstats = metadata
5745            .get("nerdstats")
5746            .expect("nerdstats should be present by default");
5747
5748        assert_eq!(nerdstats["workflow_id"], Value::String(output.workflow_id));
5749        assert_eq!(
5750            nerdstats["terminal_node"],
5751            Value::String(output.terminal_node)
5752        );
5753        assert_eq!(
5754            nerdstats["total_tokens"],
5755            Value::Number(output.total_tokens.into())
5756        );
5757        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
5758        assert_eq!(
5759            nerdstats["token_metrics_source"],
5760            Value::String("provider_usage".to_string())
5761        );
5762        assert!(nerdstats.get("step_timings").is_none());
5763        assert!(nerdstats.get("llm_node_metrics").is_none());
5764        assert!(nerdstats.get("step_details").is_some());
5765        assert!(nerdstats.get("llm_node_models").is_none());
5766        assert_eq!(
5767            nerdstats["step_details"][0]["model_name"],
5768            Value::String("gpt-4.1".to_string())
5769        );
5770        assert_eq!(
5771            nerdstats["step_details"][0]["node_id"],
5772            Value::String("classify".to_string())
5773        );
5774        assert_eq!(nerdstats["ttft_ms"], Value::Null);
5775    }
5776
5777    #[tokio::test]
5778    async fn workflow_completed_event_omits_nerdstats_when_disabled() {
5779        let yaml = r#"
5780id: nerdstats-disabled
5781entry_node: classify
5782nodes:
5783  - id: classify
5784    node_type:
5785      llm_call:
5786        model: gpt-4.1
5787    config:
5788      prompt: |
5789        Classify this email into exactly one category:
5790        {{ input.email_text }}
5791"#;
5792
5793        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5794        let sink = RecordingSink {
5795            events: Mutex::new(Vec::new()),
5796        };
5797        let options = YamlWorkflowRunOptions {
5798            telemetry: YamlWorkflowTelemetryConfig {
5799                nerdstats: false,
5800                ..YamlWorkflowTelemetryConfig::default()
5801            },
5802            ..YamlWorkflowRunOptions::default()
5803        };
5804
5805        run_workflow_yaml_with_custom_worker_and_events_and_options(
5806            &workflow,
5807            &json!({"email_text":"hello"}),
5808            &MockExecutor,
5809            None,
5810            Some(&sink),
5811            &options,
5812        )
5813        .await
5814        .expect("workflow should execute");
5815
5816        let events = sink
5817            .events
5818            .lock()
5819            .expect("recording sink lock should not be poisoned");
5820        let completed = events
5821            .iter()
5822            .find(|event| event.event_type == "workflow_completed")
5823            .expect("expected workflow_completed event");
5824        assert!(completed.metadata.is_none());
5825    }
5826
5827    struct StreamAwareMockExecutor;
5828
5829    #[async_trait]
5830    impl YamlWorkflowLlmExecutor for StreamAwareMockExecutor {
5831        async fn complete_structured(
5832            &self,
5833            request: YamlLlmExecutionRequest,
5834            _event_sink: Option<&dyn YamlWorkflowEventSink>,
5835        ) -> Result<YamlLlmExecutionResult, String> {
5836            Ok(YamlLlmExecutionResult {
5837                payload: json!({"state":"ok"}),
5838                usage: Some(YamlLlmTokenUsage {
5839                    prompt_tokens: 20,
5840                    completion_tokens: 10,
5841                    total_tokens: 30,
5842                    reasoning_tokens: None,
5843                }),
5844                ttft_ms: if request.stream { Some(12) } else { None },
5845                tool_calls: Vec::new(),
5846            })
5847        }
5848    }
5849
5850    #[tokio::test]
5851    async fn workflow_completed_event_includes_nerdstats_for_streaming_nodes() {
5852        let yaml = r#"
5853id: nerdstats-streaming
5854entry_node: classify
5855nodes:
5856  - id: classify
5857    node_type:
5858      llm_call:
5859        model: gpt-4.1
5860        stream: true
5861    config:
5862      prompt: |
5863        Return JSON only:
5864        {"state":"ok"}
5865"#;
5866
5867        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5868        let sink = RecordingSink {
5869            events: Mutex::new(Vec::new()),
5870        };
5871
5872        run_workflow_yaml_with_custom_worker_and_events_and_options(
5873            &workflow,
5874            &json!({"email_text":"hello"}),
5875            &StreamAwareMockExecutor,
5876            None,
5877            Some(&sink),
5878            &YamlWorkflowRunOptions::default(),
5879        )
5880        .await
5881        .expect("workflow should execute");
5882
5883        let events = sink
5884            .events
5885            .lock()
5886            .expect("recording sink lock should not be poisoned");
5887        let completed = events
5888            .iter()
5889            .find(|event| event.event_type == "workflow_completed")
5890            .expect("expected workflow_completed event");
5891        let metadata = completed
5892            .metadata
5893            .as_ref()
5894            .expect("workflow_completed should include metadata by default");
5895        let nerdstats = metadata
5896            .get("nerdstats")
5897            .expect("nerdstats should be present by default");
5898
5899        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
5900        assert_eq!(nerdstats["total_tokens"], Value::Number(30u64.into()));
5901        assert_eq!(nerdstats["ttft_ms"], Value::Number(12u64.into()));
5902        assert!(nerdstats.get("step_timings").is_none());
5903        assert!(nerdstats.get("llm_node_metrics").is_none());
5904        assert_eq!(
5905            nerdstats["step_details"][0]["model_name"],
5906            Value::String("gpt-4.1".to_string())
5907        );
5908        assert_eq!(
5909            nerdstats["step_details"][0]["node_id"],
5910            Value::String("classify".to_string())
5911        );
5912    }
5913
5914    #[test]
5915    fn workflow_nerdstats_marks_stream_token_metrics_unavailable() {
5916        let output = YamlWorkflowRunOutput {
5917            workflow_id: "workflow".to_string(),
5918            entry_node: "start".to_string(),
5919            email_text: "hello".to_string(),
5920            trace: vec!["llm_node".to_string()],
5921            outputs: BTreeMap::new(),
5922            terminal_node: "llm_node".to_string(),
5923            terminal_output: None,
5924            step_timings: vec![YamlStepTiming {
5925                node_id: "llm_node".to_string(),
5926                node_kind: "llm_call".to_string(),
5927                model_name: Some("gpt-4.1".to_string()),
5928                elapsed_ms: 100,
5929                prompt_tokens: None,
5930                completion_tokens: None,
5931                total_tokens: None,
5932                reasoning_tokens: None,
5933                tokens_per_second: None,
5934            }],
5935            llm_node_metrics: BTreeMap::new(),
5936            llm_node_models: BTreeMap::new(),
5937            total_elapsed_ms: 100,
5938            ttft_ms: None,
5939            total_input_tokens: 0,
5940            total_output_tokens: 0,
5941            total_tokens: 0,
5942            total_reasoning_tokens: None,
5943            tokens_per_second: 0.0,
5944            trace_id: Some("trace-1".to_string()),
5945            metadata: None,
5946        };
5947
5948        let nerdstats = workflow_nerdstats(&output);
5949        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(false));
5950        assert_eq!(
5951            nerdstats["token_metrics_source"],
5952            Value::String("provider_stream_usage_unavailable".to_string())
5953        );
5954        assert_eq!(nerdstats["total_tokens"], Value::Null);
5955        assert_eq!(nerdstats["ttft_ms"], Value::Null);
5956        assert!(nerdstats.get("step_timings").is_none());
5957        assert!(nerdstats.get("llm_node_metrics").is_none());
5958        assert_eq!(
5959            nerdstats["step_details"][0]["node_id"],
5960            Value::String("llm_node".to_string())
5961        );
5962        assert_eq!(
5963            nerdstats["step_details"][0]["model_name"],
5964            Value::String("gpt-4.1".to_string())
5965        );
5966    }
5967
5968    #[test]
5969    fn workflow_nerdstats_includes_ttft_when_available() {
5970        let output = YamlWorkflowRunOutput {
5971            workflow_id: "workflow".to_string(),
5972            entry_node: "start".to_string(),
5973            email_text: "hello".to_string(),
5974            trace: vec!["llm_node".to_string()],
5975            outputs: BTreeMap::new(),
5976            terminal_node: "llm_node".to_string(),
5977            terminal_output: None,
5978            step_timings: vec![YamlStepTiming {
5979                node_id: "llm_node".to_string(),
5980                node_kind: "llm_call".to_string(),
5981                model_name: Some("gpt-4.1".to_string()),
5982                elapsed_ms: 100,
5983                prompt_tokens: Some(10),
5984                completion_tokens: Some(15),
5985                total_tokens: Some(25),
5986                reasoning_tokens: None,
5987                tokens_per_second: Some(150.0),
5988            }],
5989            llm_node_metrics: BTreeMap::new(),
5990            llm_node_models: BTreeMap::new(),
5991            total_elapsed_ms: 100,
5992            ttft_ms: Some(42),
5993            total_input_tokens: 10,
5994            total_output_tokens: 15,
5995            total_tokens: 25,
5996            total_reasoning_tokens: None,
5997            tokens_per_second: 150.0,
5998            trace_id: Some("trace-2".to_string()),
5999            metadata: None,
6000        };
6001
6002        let nerdstats = workflow_nerdstats(&output);
6003        assert_eq!(nerdstats["ttft_ms"], Value::Number(42u64.into()));
6004        assert!(nerdstats.get("step_timings").is_none());
6005        assert!(nerdstats.get("llm_node_metrics").is_none());
6006        assert_eq!(
6007            nerdstats["step_details"][0]["node_id"],
6008            Value::String("llm_node".to_string())
6009        );
6010        assert_eq!(
6011            nerdstats["step_details"][0]["model_name"],
6012            Value::String("gpt-4.1".to_string())
6013        );
6014    }
6015
6016    #[test]
6017    fn workflow_nerdstats_schema_contract_is_stable() {
6018        let output = YamlWorkflowRunOutput {
6019            workflow_id: "schema-workflow".to_string(),
6020            entry_node: "start".to_string(),
6021            email_text: "hello".to_string(),
6022            trace: vec!["classify".to_string(), "route".to_string()],
6023            outputs: BTreeMap::new(),
6024            terminal_node: "route".to_string(),
6025            terminal_output: None,
6026            step_timings: vec![
6027                YamlStepTiming {
6028                    node_id: "classify".to_string(),
6029                    node_kind: "llm_call".to_string(),
6030                    model_name: Some("gpt-4.1".to_string()),
6031                    elapsed_ms: 100,
6032                    prompt_tokens: Some(11),
6033                    completion_tokens: Some(22),
6034                    total_tokens: Some(33),
6035                    reasoning_tokens: Some(7),
6036                    tokens_per_second: Some(220.0),
6037                },
6038                YamlStepTiming {
6039                    node_id: "route".to_string(),
6040                    node_kind: "switch".to_string(),
6041                    model_name: None,
6042                    elapsed_ms: 0,
6043                    prompt_tokens: None,
6044                    completion_tokens: None,
6045                    total_tokens: None,
6046                    reasoning_tokens: None,
6047                    tokens_per_second: None,
6048                },
6049            ],
6050            llm_node_metrics: BTreeMap::new(),
6051            llm_node_models: BTreeMap::new(),
6052            total_elapsed_ms: 100,
6053            ttft_ms: Some(9),
6054            total_input_tokens: 11,
6055            total_output_tokens: 22,
6056            total_tokens: 33,
6057            total_reasoning_tokens: Some(7),
6058            tokens_per_second: 220.0,
6059            trace_id: Some("trace-schema".to_string()),
6060            metadata: None,
6061        };
6062
6063        let nerdstats = workflow_nerdstats(&output);
6064        let expected = json!({
6065            "workflow_id": "schema-workflow",
6066            "terminal_node": "route",
6067            "total_elapsed_ms": 100,
6068            "ttft_ms": 9,
6069            "step_details": [
6070                {
6071                    "node_id": "classify",
6072                    "node_kind": "llm_call",
6073                    "model_name": "gpt-4.1",
6074                    "elapsed_ms": 100,
6075                    "prompt_tokens": 11,
6076                    "completion_tokens": 22,
6077                    "total_tokens": 33,
6078                    "reasoning_tokens": 7,
6079                    "tokens_per_second": 220.0
6080                },
6081                {
6082                    "node_id": "route",
6083                    "node_kind": "switch",
6084                    "elapsed_ms": 0
6085                }
6086            ],
6087            "total_input_tokens": 11,
6088            "total_output_tokens": 22,
6089            "total_tokens": 33,
6090            "total_reasoning_tokens": 7,
6091            "tokens_per_second": 220.0,
6092            "trace_id": "trace-schema",
6093            "token_metrics_available": true,
6094            "token_metrics_source": "provider_usage",
6095            "llm_nodes_without_usage": []
6096        });
6097
6098        assert_eq!(nerdstats, expected);
6099    }
6100
6101    struct MessageHistoryExecutor;
6102
6103    #[async_trait]
6104    impl YamlWorkflowLlmExecutor for MessageHistoryExecutor {
6105        async fn complete_structured(
6106            &self,
6107            request: YamlLlmExecutionRequest,
6108            _event_sink: Option<&dyn YamlWorkflowEventSink>,
6109        ) -> Result<YamlLlmExecutionResult, String> {
6110            let messages = request
6111                .messages
6112                .ok_or_else(|| "expected messages in request".to_string())?;
6113            if messages.len() != 2 {
6114                return Err(format!("expected 2 messages, got {}", messages.len()));
6115            }
6116            Ok(YamlLlmExecutionResult {
6117                payload: json!({"category":"termination","reason":"history"}),
6118                usage: Some(YamlLlmTokenUsage {
6119                    prompt_tokens: 7,
6120                    completion_tokens: 3,
6121                    total_tokens: 10,
6122                    reasoning_tokens: None,
6123                }),
6124                ttft_ms: None,
6125                tool_calls: Vec::new(),
6126            })
6127        }
6128    }
6129
6130    #[tokio::test]
6131    async fn supports_messages_path_in_workflow_input() {
6132        let yaml = r#"
6133id: email-intake-classification
6134entry_node: classify_top_level
6135nodes:
6136  - id: classify_top_level
6137    node_type:
6138      llm_call:
6139        model: gpt-4.1
6140        messages_path: input.messages
6141        append_prompt_as_user: false
6142"#;
6143
6144        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6145        let input = json!({
6146            "email_text": "ignored",
6147            "messages": [
6148                {"role": "system", "content": "You are a classifier"},
6149                {"role": "user", "content": "Please classify this"}
6150            ]
6151        });
6152
6153        let output = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
6154            .await
6155            .expect("workflow should use chat history from input");
6156
6157        assert_eq!(output.terminal_node, "classify_top_level");
6158        assert_eq!(
6159            output.outputs["classify_top_level"]["output"]["reason"],
6160            Value::String("history".to_string())
6161        );
6162    }
6163
6164    #[tokio::test]
6165    async fn wrapper_entrypoints_produce_equivalent_outputs() {
6166        let yaml = r#"
6167id: wrapper-equivalence
6168entry_node: classify
6169nodes:
6170  - id: classify
6171    node_type:
6172      llm_call:
6173        model: gpt-4.1
6174    config:
6175      prompt: |
6176        Classify this email into exactly one category:
6177        {{ input.email_text }}
6178"#;
6179
6180        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6181        let input = json!({"email_text":"hello"});
6182
6183        let a = run_workflow_yaml(&workflow, &input, &MockExecutor)
6184            .await
6185            .expect("base entrypoint should execute");
6186        let b = run_workflow_yaml_with_custom_worker(&workflow, &input, &MockExecutor, None)
6187            .await
6188            .expect("custom worker wrapper should execute");
6189        let c = run_workflow_yaml_with_custom_worker_and_events_and_options(
6190            &workflow,
6191            &input,
6192            &MockExecutor,
6193            None,
6194            None,
6195            &YamlWorkflowRunOptions::default(),
6196        )
6197        .await
6198        .expect("events/options wrapper should execute");
6199
6200        assert_eq!(a.workflow_id, b.workflow_id);
6201        assert_eq!(a.workflow_id, c.workflow_id);
6202        assert_eq!(a.terminal_node, b.terminal_node);
6203        assert_eq!(a.terminal_node, c.terminal_node);
6204        assert_eq!(a.outputs, b.outputs);
6205        assert_eq!(a.outputs, c.outputs);
6206        assert_eq!(a.total_tokens, b.total_tokens);
6207        assert_eq!(a.total_tokens, c.total_tokens);
6208    }
6209
6210    #[tokio::test]
6211    async fn yaml_llm_tool_calling_captures_traces_and_supports_globals_reference() {
6212        let yaml = r#"
6213id: tool-calling-workflow
6214entry_node: generate_with_tool
6215nodes:
6216  - id: generate_with_tool
6217    node_type:
6218      llm_call:
6219        model: gpt-4.1
6220        tools_format: simplified
6221        max_tool_roundtrips: 1
6222        tool_calls_global_key: audit
6223        tools:
6224          - name: get_customer_context
6225            description: Fetch customer context
6226            input_schema:
6227              type: object
6228              properties:
6229                order_id: { type: string }
6230              required: [order_id]
6231              additionalProperties: false
6232            output_schema:
6233              type: object
6234              properties:
6235                customer_name: { type: string }
6236              required: [customer_name]
6237              additionalProperties: false
6238    config:
6239      output_schema:
6240        type: object
6241        properties:
6242          state: { type: string }
6243        required: [state]
6244  - id: personalize
6245    node_type:
6246      llm_call:
6247        model: gpt-4.1
6248    config:
6249      prompt: |
6250        Write an email greeting for {{ globals.audit.0.output.customer_name }}.
6251      output_schema:
6252        type: object
6253        properties:
6254          subject: { type: string }
6255          body: { type: string }
6256        required: [subject, body]
6257edges:
6258  - from: generate_with_tool
6259    to: personalize
6260"#;
6261
6262        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6263        let client = SimpleAgentsClientBuilder::new()
6264            .with_provider(Arc::new(ToolLoopProvider))
6265            .build()
6266            .expect("client should build");
6267        let worker = FixedToolWorker {
6268            payload: json!({"customer_name": "Ava"}),
6269        };
6270
6271        let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6272            &workflow,
6273            &json!({"email_text":"hello"}),
6274            &client,
6275            Some(&worker),
6276            None,
6277            &YamlWorkflowRunOptions::default(),
6278        )
6279        .await
6280        .expect("workflow should execute");
6281
6282        assert_eq!(output.trace, vec!["generate_with_tool", "personalize"]);
6283        assert_eq!(
6284            output.outputs["generate_with_tool"]["tool_calls"][0]["output"]["customer_name"],
6285            Value::String("Ava".to_string())
6286        );
6287        let body = output.outputs["personalize"]["output"]["body"]
6288            .as_str()
6289            .expect("body should be string");
6290        assert!(body.contains("Ava"));
6291    }
6292
6293    #[tokio::test]
6294    async fn workflow_with_client_preserves_reasoning_tokens_in_output_and_nerdstats() {
6295        let yaml = r#"
6296id: reasoning-usage-workflow
6297entry_node: classify
6298nodes:
6299  - id: classify
6300    node_type:
6301      llm_call:
6302        model: gpt-4.1
6303    config:
6304      prompt: |
6305        Return JSON only:
6306        {"state":"ok"}
6307      output_schema:
6308        type: object
6309        properties:
6310          state: { type: string }
6311        required: [state]
6312"#;
6313
6314        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6315        let client = SimpleAgentsClientBuilder::new()
6316            .with_provider(Arc::new(ReasoningUsageProvider))
6317            .build()
6318            .expect("client should build");
6319
6320        let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6321            &workflow,
6322            &json!({"email_text":"hello"}),
6323            &client,
6324            None,
6325            None,
6326            &YamlWorkflowRunOptions::default(),
6327        )
6328        .await
6329        .expect("workflow should execute");
6330
6331        assert_eq!(output.total_reasoning_tokens, Some(4));
6332        assert_eq!(output.step_timings.len(), 1);
6333        assert_eq!(output.step_timings[0].reasoning_tokens, Some(4));
6334
6335        let nerdstats = workflow_nerdstats(&output);
6336        assert_eq!(
6337            nerdstats["total_reasoning_tokens"],
6338            Value::Number(4u64.into())
6339        );
6340        assert_eq!(
6341            nerdstats["step_details"][0]["reasoning_tokens"],
6342            Value::Number(4u64.into())
6343        );
6344    }
6345
6346    #[tokio::test]
6347    async fn workflow_with_tools_accumulates_reasoning_tokens_across_roundtrips() {
6348        let yaml = r#"
6349id: tool-reasoning-workflow
6350entry_node: generate_with_tool
6351nodes:
6352  - id: generate_with_tool
6353    node_type:
6354      llm_call:
6355        model: gpt-4.1
6356        tools_format: simplified
6357        max_tool_roundtrips: 1
6358        tools:
6359          - name: get_customer_context
6360            input_schema:
6361              type: object
6362              properties:
6363                order_id: { type: string }
6364              required: [order_id]
6365              additionalProperties: false
6366            output_schema:
6367              type: object
6368              properties:
6369                customer_name: { type: string }
6370              required: [customer_name]
6371              additionalProperties: false
6372    config:
6373      output_schema:
6374        type: object
6375        properties:
6376          state: { type: string }
6377        required: [state]
6378"#;
6379
6380        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6381        let client = SimpleAgentsClientBuilder::new()
6382            .with_provider(Arc::new(ToolLoopReasoningProvider))
6383            .build()
6384            .expect("client should build");
6385        let worker = FixedToolWorker {
6386            payload: json!({"customer_name": "Ava"}),
6387        };
6388
6389        let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6390            &workflow,
6391            &json!({"email_text":"hello"}),
6392            &client,
6393            Some(&worker),
6394            None,
6395            &YamlWorkflowRunOptions::default(),
6396        )
6397        .await
6398        .expect("workflow should execute");
6399
6400        assert_eq!(output.total_reasoning_tokens, Some(5));
6401        assert_eq!(output.step_timings.len(), 1);
6402        assert_eq!(output.step_timings[0].reasoning_tokens, Some(5));
6403
6404        let nerdstats = workflow_nerdstats(&output);
6405        assert_eq!(
6406            nerdstats["total_reasoning_tokens"],
6407            Value::Number(5u64.into())
6408        );
6409        assert_eq!(
6410            nerdstats["step_details"][0]["reasoning_tokens"],
6411            Value::Number(5u64.into())
6412        );
6413    }
6414
6415    #[tokio::test]
6416    async fn yaml_llm_tool_output_schema_mismatch_hard_fails_node() {
6417        let yaml = r#"
6418id: tool-calling-schema-fail
6419entry_node: generate_with_tool
6420nodes:
6421  - id: generate_with_tool
6422    node_type:
6423      llm_call:
6424        model: gpt-4.1
6425        tools_format: simplified
6426        max_tool_roundtrips: 1
6427        tools:
6428          - name: get_customer_context
6429            input_schema:
6430              type: object
6431              properties:
6432                order_id: { type: string }
6433              required: [order_id]
6434              additionalProperties: false
6435            output_schema:
6436              type: object
6437              properties:
6438                customer_name: { type: string }
6439              required: [customer_name]
6440              additionalProperties: false
6441    config:
6442      output_schema:
6443        type: object
6444        properties:
6445          state: { type: string }
6446        required: [state]
6447"#;
6448
6449        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6450        let client = SimpleAgentsClientBuilder::new()
6451            .with_provider(Arc::new(ToolLoopProvider))
6452            .build()
6453            .expect("client should build");
6454        let worker = FixedToolWorker {
6455            payload: json!({"unexpected": "shape"}),
6456        };
6457
6458        let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6459            &workflow,
6460            &json!({"email_text":"hello"}),
6461            &client,
6462            Some(&worker),
6463            None,
6464            &YamlWorkflowRunOptions::default(),
6465        )
6466        .await
6467        .expect_err("workflow should hard-fail on schema mismatch");
6468
6469        match error {
6470            YamlWorkflowRunError::Llm { message, .. } => {
6471                assert!(message.contains("output failed schema validation"));
6472            }
6473            other => panic!("expected llm error, got {other:?}"),
6474        }
6475    }
6476
6477    #[tokio::test]
6478    async fn yaml_llm_unknown_tool_is_rejected_before_custom_worker_execution() {
6479        let yaml = r#"
6480id: unknown-tool-rejected
6481entry_node: generate_with_tool
6482nodes:
6483  - id: generate_with_tool
6484    node_type:
6485      llm_call:
6486        model: gpt-4.1
6487        tools_format: simplified
6488        max_tool_roundtrips: 1
6489        tools:
6490          - name: get_customer_context
6491            input_schema:
6492              type: object
6493              properties:
6494                order_id: { type: string }
6495              required: [order_id]
6496    config:
6497      output_schema:
6498        type: object
6499        properties:
6500          state: { type: string }
6501        required: [state]
6502"#;
6503
6504        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6505        let client = SimpleAgentsClientBuilder::new()
6506            .with_provider(Arc::new(UnknownToolProvider))
6507            .build()
6508            .expect("client should build");
6509        let worker = CountingToolWorker {
6510            execute_calls: AtomicUsize::new(0),
6511        };
6512
6513        let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6514            &workflow,
6515            &json!({"email_text":"hello"}),
6516            &client,
6517            Some(&worker),
6518            None,
6519            &YamlWorkflowRunOptions::default(),
6520        )
6521        .await
6522        .expect_err("workflow should reject unknown tool before executing worker");
6523
6524        match error {
6525            YamlWorkflowRunError::Llm { message, .. } => {
6526                assert!(message.contains("model requested unknown tool 'unknown_tool'"));
6527            }
6528            other => panic!("expected llm error, got {other:?}"),
6529        }
6530
6531        assert_eq!(worker.execute_calls.load(Ordering::SeqCst), 0);
6532    }
6533
6534    #[test]
6535    fn validates_tools_format_mismatch() {
6536        let yaml = r#"
6537id: mismatch
6538entry_node: generate
6539nodes:
6540  - id: generate
6541    node_type:
6542      llm_call:
6543        model: gpt-4.1
6544        tools_format: openai
6545        tools:
6546          - name: get_customer_context
6547            input_schema:
6548              type: object
6549              properties:
6550                order_id: { type: string }
6551              required: [order_id]
6552"#;
6553
6554        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6555        let diagnostics = verify_yaml_workflow(&workflow);
6556        assert!(diagnostics
6557            .iter()
6558            .any(|diagnostic| diagnostic.code == "invalid_tools_format"));
6559    }
6560
6561    #[test]
6562    fn mock_custom_worker_supports_get_employee_record() {
6563        let result = mock_custom_worker_output(
6564            "get_employee_record",
6565            &json!({"employee_name": "Alex Johnson"}),
6566        )
6567        .expect("mock tool should resolve employee record");
6568
6569        assert_eq!(result["employee_id"], Value::String("EMP-2041".to_string()));
6570        assert_eq!(result["location"], Value::String("Austin".to_string()));
6571    }
6572
6573    #[tokio::test]
6574    async fn custom_worker_receives_trace_context_block() {
6575        let yaml = r#"
6576id: custom-worker-trace-context
6577entry_node: lookup
6578nodes:
6579  - id: lookup
6580    node_type:
6581      custom_worker:
6582        handler: GetRagData
6583    config:
6584      payload:
6585        topic: demo
6586"#;
6587
6588        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6589        let worker = CapturingWorker {
6590            context: Mutex::new(None),
6591        };
6592        let options = YamlWorkflowRunOptions {
6593            trace: YamlWorkflowTraceOptions {
6594                context: Some(YamlWorkflowTraceContextInput {
6595                    trace_id: Some("trace-fixed-ctx".to_string()),
6596                    traceparent: Some("00-trace-fixed-ctx-span-fixed-01".to_string()),
6597                    ..YamlWorkflowTraceContextInput::default()
6598                }),
6599                tenant: YamlWorkflowTraceTenantContext {
6600                    conversation_id: Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5".to_string()),
6601                    request_id: Some("turn-7".to_string()),
6602                    ..YamlWorkflowTraceTenantContext::default()
6603                },
6604            },
6605            ..YamlWorkflowRunOptions::default()
6606        };
6607
6608        run_workflow_yaml_with_custom_worker_and_events_and_options(
6609            &workflow,
6610            &json!({"email_text":"hello"}),
6611            &MockExecutor,
6612            Some(&worker),
6613            None,
6614            &options,
6615        )
6616        .await
6617        .expect("workflow should execute");
6618
6619        let captured_context = worker
6620            .context
6621            .lock()
6622            .expect("capturing worker lock should not be poisoned")
6623            .clone()
6624            .expect("custom worker should receive context");
6625
6626        assert_eq!(
6627            captured_context
6628                .get("trace")
6629                .and_then(|trace| trace.get("context"))
6630                .and_then(|context| context.get("trace_id"))
6631                .and_then(Value::as_str),
6632            Some("trace-fixed-ctx")
6633        );
6634        assert_eq!(
6635            captured_context
6636                .get("trace")
6637                .and_then(|trace| trace.get("context"))
6638                .and_then(|context| context.get("traceparent"))
6639                .and_then(Value::as_str),
6640            Some("00-trace-fixed-ctx-span-fixed-01")
6641        );
6642        assert_eq!(
6643            captured_context
6644                .get("trace")
6645                .and_then(|trace| trace.get("tenant"))
6646                .and_then(|tenant| tenant.get("conversation_id"))
6647                .and_then(Value::as_str),
6648            Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5")
6649        );
6650    }
6651
6652    #[tokio::test]
6653    async fn event_sink_cancellation_stops_workflow_before_llm_execution() {
6654        let yaml = r#"
6655id: cancellation-test
6656entry_node: classify
6657nodes:
6658  - id: classify
6659    node_type:
6660      llm_call:
6661        model: gpt-4.1
6662    config:
6663      prompt: |
6664        Classify this email into exactly one category:
6665        {{ input.email_text }}
6666"#;
6667
6668        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6669        let executor = CountingExecutor {
6670            call_count: AtomicUsize::new(0),
6671        };
6672        let sink = CancelAfterFirstEventSink {
6673            cancelled: AtomicBool::new(false),
6674        };
6675
6676        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
6677            &workflow,
6678            &json!({"email_text":"hello"}),
6679            &executor,
6680            None,
6681            Some(&sink),
6682            &YamlWorkflowRunOptions::default(),
6683        )
6684        .await
6685        .expect_err("workflow should stop when sink signals cancellation");
6686
6687        assert!(matches!(
6688            err,
6689            YamlWorkflowRunError::EventSinkCancelled { .. }
6690        ));
6691        assert_eq!(executor.call_count.load(Ordering::SeqCst), 0);
6692    }
6693
6694    #[tokio::test]
6695    async fn rejects_invalid_messages_path_shape() {
6696        let yaml = r#"
6697id: email-intake-classification
6698entry_node: classify_top_level
6699nodes:
6700  - id: classify_top_level
6701    node_type:
6702      llm_call:
6703        model: gpt-4.1
6704        messages_path: input.messages
6705"#;
6706
6707        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6708        let input = json!({
6709            "email_text": "ignored",
6710            "messages": "not-a-list"
6711        });
6712
6713        let err = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
6714            .await
6715            .expect_err("workflow should fail for invalid messages shape");
6716
6717        assert!(matches!(err, YamlWorkflowRunError::Llm { .. }));
6718    }
6719
6720    #[test]
6721    fn renders_yaml_workflow_to_mermaid_with_switch_labels() {
6722        let yaml = r#"
6723id: chat-workflow
6724entry_node: decide
6725nodes:
6726  - id: decide
6727    node_type:
6728      switch:
6729        branches:
6730          - condition: '$.input.mode == "draft"'
6731            target: draft
6732        default: ask
6733  - id: draft
6734    node_type:
6735      llm_call:
6736        model: gpt-4.1
6737  - id: ask
6738    node_type:
6739      llm_call:
6740        model: gpt-4.1
6741edges:
6742  - from: draft
6743    to: ask
6744"#;
6745
6746        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6747        let mermaid = yaml_workflow_to_mermaid(&workflow);
6748
6749        assert!(mermaid.contains("flowchart TD"));
6750        assert!(mermaid.contains("decide -- \"route1\" --> draft"));
6751        assert!(mermaid.contains("decide -- \"default\" --> ask"));
6752        assert!(mermaid.contains("draft --> ask"));
6753    }
6754
6755    #[test]
6756    fn renders_yaml_workflow_tools_as_colored_tool_nodes() {
6757        let yaml = r#"
6758id: tool-graph
6759entry_node: chat
6760nodes:
6761  - id: chat
6762    node_type:
6763      llm_call:
6764        model: gemini-3-flash
6765        tools_format: simplified
6766        tools:
6767          - name: run_workflow_graph
6768            input_schema:
6769              type: object
6770              properties:
6771                workflow_id: { type: string }
6772              required: [workflow_id]
6773edges: []
6774"#;
6775
6776        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6777        let mermaid = yaml_workflow_to_mermaid(&workflow);
6778
6779        assert!(mermaid.contains("chat__tool_0"));
6780        assert!(mermaid.contains("chat -.-> chat__tool_0"));
6781        assert!(mermaid.contains("classDef toolNode"));
6782        assert!(mermaid.contains("class chat__tool_0 toolNode;"));
6783    }
6784
6785    #[test]
6786    fn renders_yaml_workflow_file_to_mermaid_with_subgraph_cluster_when_present() {
6787        let base_dir = std::env::temp_dir().join(format!(
6788            "simple_agents_mermaid_subgraph_{}",
6789            std::time::SystemTime::now()
6790                .duration_since(std::time::UNIX_EPOCH)
6791                .expect("unix epoch")
6792                .as_nanos()
6793        ));
6794        fs::create_dir_all(&base_dir).expect("temp dir should be created");
6795
6796        let orchestrator_path = base_dir.join("email-chat-orchestrator-with-subgraph-tool.yaml");
6797        let subgraph_path = base_dir.join("hr-warning-email-subgraph.yaml");
6798
6799        let orchestrator_yaml = r#"
6800id: email-chat-orchestrator-with-subgraph-tool
6801entry_node: respond_casual
6802nodes:
6803  - id: respond_casual
6804    node_type:
6805      llm_call:
6806        model: gemini-3-flash
6807        tools_format: simplified
6808        tools:
6809          - name: run_workflow_graph
6810            input_schema:
6811              type: object
6812              properties:
6813                workflow_id: { type: string }
6814              required: [workflow_id]
6815    config:
6816      prompt: |
6817        Call with:
6818        {
6819          "workflow_id": "hr_warning_email_subgraph"
6820        }
6821edges: []
6822"#;
6823
6824        let subgraph_yaml = r#"
6825id: hr-warning-email-subgraph
6826entry_node: draft_hr_warning_email
6827nodes:
6828  - id: draft_hr_warning_email
6829    node_type:
6830      llm_call:
6831        model: gemini-3-flash
6832edges: []
6833"#;
6834
6835        fs::write(&orchestrator_path, orchestrator_yaml).expect("orchestrator yaml written");
6836        fs::write(&subgraph_path, subgraph_yaml).expect("subgraph yaml written");
6837
6838        let mermaid =
6839            yaml_workflow_file_to_mermaid(&orchestrator_path).expect("mermaid should render");
6840
6841        assert!(mermaid.contains("Main: email-chat-orchestrator-with-subgraph-tool"));
6842        assert!(mermaid.contains("Subgraph: hr_warning_email_subgraph"));
6843        assert!(mermaid.contains("calls hr_warning_email_subgraph"));
6844        assert!(mermaid.contains("subgraph_1__draft_hr_warning_email"));
6845
6846        fs::remove_dir_all(base_dir).expect("temp dir removed");
6847    }
6848
6849    #[test]
6850    fn converts_yaml_workflow_to_ir_definition() {
6851        let yaml = r#"
6852id: chat-workflow
6853entry_node: classify
6854nodes:
6855  - id: classify
6856    node_type:
6857      llm_call:
6858        model: gpt-4.1
6859    config:
6860      prompt: |
6861        classify
6862  - id: route
6863    node_type:
6864      switch:
6865        branches:
6866          - condition: '$.nodes.classify.output.kind == "x"'
6867            target: done
6868        default: done
6869  - id: done
6870    node_type:
6871      custom_worker:
6872        handler: GetRagData
6873    config:
6874      payload:
6875        topic: test
6876edges:
6877  - from: classify
6878    to: route
6879"#;
6880
6881        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6882        let ir = yaml_workflow_to_ir(&workflow).expect("yaml should convert to ir");
6883
6884        assert_eq!(ir.name, "chat-workflow");
6885        assert!(ir.nodes.iter().any(|n| n.id == "__yaml_start"));
6886        assert!(ir.nodes.iter().any(|n| n.id == "classify"));
6887        assert!(ir.nodes.iter().any(|n| n.id == "route"));
6888        assert!(ir.nodes.iter().any(|n| n.id == "done"));
6889    }
6890
6891    #[test]
6892    fn supports_yaml_to_ir_when_messages_path_is_used() {
6893        let yaml = r#"
6894id: chat-workflow
6895entry_node: classify
6896nodes:
6897  - id: classify
6898    node_type:
6899      llm_call:
6900        model: gpt-4.1
6901        messages_path: input.messages
6902"#;
6903
6904        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6905        let ir =
6906            yaml_workflow_to_ir(&workflow).expect("messages_path should convert to tool-based IR");
6907        assert!(ir.nodes.iter().any(|node| matches!(
6908            node.kind,
6909            crate::ir::NodeKind::Tool { ref tool, .. } if tool == "__yaml_llm_call"
6910        )));
6911    }
6912
6913    #[tokio::test]
6914    async fn workflow_output_contains_trace_id_in_both_locations() {
6915        let yaml = r#"
6916id: trace-test
6917entry_node: classify
6918nodes:
6919  - id: classify
6920    node_type:
6921      llm_call:
6922        model: gpt-4.1
6923    config:
6924      prompt: |
6925        Classify this email into exactly one category:
6926        {{ input.email_text }}
6927"#;
6928
6929        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6930        let output = run_workflow_yaml(&workflow, &json!({"email_text":"hello"}), &MockExecutor)
6931            .await
6932            .expect("workflow should execute");
6933
6934        let trace_id = output
6935            .trace_id
6936            .as_deref()
6937            .expect("trace_id should be present");
6938        assert!(!trace_id.is_empty());
6939        assert_eq!(
6940            output.metadata.as_ref().and_then(|value| {
6941                value
6942                    .get("telemetry")
6943                    .and_then(|telemetry| telemetry.get("trace_id"))
6944                    .and_then(Value::as_str)
6945            }),
6946            Some(trace_id)
6947        );
6948    }
6949
6950    #[tokio::test]
6951    async fn workflow_run_options_sample_rate_zero_marks_trace_unsampled() {
6952        let yaml = r#"
6953id: sample-rate-zero
6954entry_node: classify
6955nodes:
6956  - id: classify
6957    node_type:
6958      llm_call:
6959        model: gpt-4.1
6960    config:
6961      prompt: |
6962        Classify this email into exactly one category:
6963        {{ input.email_text }}
6964"#;
6965
6966        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6967        let options = YamlWorkflowRunOptions {
6968            telemetry: YamlWorkflowTelemetryConfig {
6969                sample_rate: 0.0,
6970                ..YamlWorkflowTelemetryConfig::default()
6971            },
6972            trace: YamlWorkflowTraceOptions {
6973                context: Some(YamlWorkflowTraceContextInput {
6974                    trace_id: Some("trace-sample-zero".to_string()),
6975                    ..YamlWorkflowTraceContextInput::default()
6976                }),
6977                tenant: YamlWorkflowTraceTenantContext::default(),
6978            },
6979            model: None,
6980        };
6981
6982        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6983            &workflow,
6984            &json!({"email_text":"hello"}),
6985            &MockExecutor,
6986            None,
6987            None,
6988            &options,
6989        )
6990        .await
6991        .expect("workflow should execute");
6992
6993        assert_eq!(output.trace_id.as_deref(), Some("trace-sample-zero"));
6994        assert_eq!(
6995            output
6996                .metadata
6997                .as_ref()
6998                .and_then(|value| value.get("telemetry"))
6999                .and_then(|telemetry| telemetry.get("sampled"))
7000                .and_then(Value::as_bool),
7001            Some(false)
7002        );
7003    }
7004
7005    #[test]
7006    fn trace_id_from_traceparent_parses_w3c_header() {
7007        let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
7008        assert_eq!(
7009            trace_id_from_traceparent(traceparent).as_deref(),
7010            Some("4bf92f3577b34da6a3ce929d0e0e4736")
7011        );
7012    }
7013
7014    #[test]
7015    fn resolve_telemetry_context_marks_traceparent_source() {
7016        let mut options = YamlWorkflowRunOptions::default();
7017        options.trace.context = Some(YamlWorkflowTraceContextInput {
7018            traceparent: Some(
7019                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
7020            ),
7021            ..YamlWorkflowTraceContextInput::default()
7022        });
7023
7024        let context = resolve_telemetry_context(&options, None);
7025
7026        assert_eq!(context.trace_id_source, TraceIdSource::Traceparent);
7027        assert_eq!(
7028            context.trace_id.as_deref(),
7029            Some("4bf92f3577b34da6a3ce929d0e0e4736")
7030        );
7031    }
7032
7033    #[tokio::test]
7034    async fn workflow_run_options_derives_trace_id_from_traceparent_when_trace_id_missing() {
7035        let yaml = r#"
7036id: traceparent-derived
7037entry_node: classify
7038nodes:
7039  - id: classify
7040    node_type:
7041      llm_call:
7042        model: gpt-4.1
7043    config:
7044      prompt: |
7045        Classify this email into exactly one category:
7046        {{ input.email_text }}
7047"#;
7048
7049        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7050        let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
7051        let options = YamlWorkflowRunOptions {
7052            telemetry: YamlWorkflowTelemetryConfig {
7053                sample_rate: 0.0,
7054                ..YamlWorkflowTelemetryConfig::default()
7055            },
7056            trace: YamlWorkflowTraceOptions {
7057                context: Some(YamlWorkflowTraceContextInput {
7058                    traceparent: Some(traceparent.to_string()),
7059                    ..YamlWorkflowTraceContextInput::default()
7060                }),
7061                tenant: YamlWorkflowTraceTenantContext::default(),
7062            },
7063            model: None,
7064        };
7065
7066        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7067            &workflow,
7068            &json!({"email_text":"hello"}),
7069            &MockExecutor,
7070            None,
7071            None,
7072            &options,
7073        )
7074        .await
7075        .expect("workflow should execute");
7076
7077        assert_eq!(
7078            output.trace_id.as_deref(),
7079            Some("4bf92f3577b34da6a3ce929d0e0e4736")
7080        );
7081        assert_eq!(
7082            output
7083                .metadata
7084                .as_ref()
7085                .and_then(|value| value.get("telemetry"))
7086                .and_then(|telemetry| telemetry.get("sampled"))
7087                .and_then(Value::as_bool),
7088            Some(false)
7089        );
7090    }
7091
7092    #[tokio::test]
7093    async fn workflow_run_options_sample_rate_one_marks_trace_sampled() {
7094        let yaml = r#"
7095id: sample-rate-one
7096entry_node: classify
7097nodes:
7098  - id: classify
7099    node_type:
7100      llm_call:
7101        model: gpt-4.1
7102    config:
7103      prompt: |
7104        Classify this email into exactly one category:
7105        {{ input.email_text }}
7106"#;
7107
7108        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7109        let options = YamlWorkflowRunOptions {
7110            telemetry: YamlWorkflowTelemetryConfig {
7111                sample_rate: 1.0,
7112                ..YamlWorkflowTelemetryConfig::default()
7113            },
7114            trace: YamlWorkflowTraceOptions {
7115                context: Some(YamlWorkflowTraceContextInput {
7116                    trace_id: Some("trace-sample-one".to_string()),
7117                    ..YamlWorkflowTraceContextInput::default()
7118                }),
7119                tenant: YamlWorkflowTraceTenantContext::default(),
7120            },
7121            model: None,
7122        };
7123
7124        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7125            &workflow,
7126            &json!({"email_text":"hello"}),
7127            &MockExecutor,
7128            None,
7129            None,
7130            &options,
7131        )
7132        .await
7133        .expect("workflow should execute");
7134
7135        assert_eq!(output.trace_id.as_deref(), Some("trace-sample-one"));
7136        assert_eq!(
7137            output
7138                .metadata
7139                .as_ref()
7140                .and_then(|value| value.get("telemetry"))
7141                .and_then(|telemetry| telemetry.get("sampled"))
7142                .and_then(Value::as_bool),
7143            Some(true)
7144        );
7145    }
7146
7147    #[tokio::test]
7148    async fn workflow_run_options_sampling_is_deterministic_for_fixed_trace_id() {
7149        let yaml = r#"
7150id: sample-rate-deterministic
7151entry_node: classify
7152nodes:
7153  - id: classify
7154    node_type:
7155      llm_call:
7156        model: gpt-4.1
7157    config:
7158      prompt: |
7159        Classify this email into exactly one category:
7160        {{ input.email_text }}
7161"#;
7162
7163        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7164        let options = YamlWorkflowRunOptions {
7165            telemetry: YamlWorkflowTelemetryConfig {
7166                sample_rate: 0.5,
7167                ..YamlWorkflowTelemetryConfig::default()
7168            },
7169            trace: YamlWorkflowTraceOptions {
7170                context: Some(YamlWorkflowTraceContextInput {
7171                    trace_id: Some("trace-sample-deterministic".to_string()),
7172                    ..YamlWorkflowTraceContextInput::default()
7173                }),
7174                tenant: YamlWorkflowTraceTenantContext::default(),
7175            },
7176            model: None,
7177        };
7178
7179        let mut sampled_values = Vec::new();
7180        for _ in 0..3 {
7181            let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7182                &workflow,
7183                &json!({"email_text":"hello"}),
7184                &MockExecutor,
7185                None,
7186                None,
7187                &options,
7188            )
7189            .await
7190            .expect("workflow should execute");
7191
7192            let sampled = output
7193                .metadata
7194                .as_ref()
7195                .and_then(|value| value.get("telemetry"))
7196                .and_then(|telemetry| telemetry.get("sampled"))
7197                .and_then(Value::as_bool)
7198                .expect("sampled flag should be present");
7199            sampled_values.push(sampled);
7200        }
7201
7202        assert!(sampled_values
7203            .iter()
7204            .all(|value| *value == sampled_values[0]));
7205    }
7206
7207    #[tokio::test]
7208    async fn workflow_run_options_reject_invalid_sample_rate() {
7209        let yaml = r#"
7210id: sample-rate-invalid
7211entry_node: classify
7212nodes:
7213  - id: classify
7214    node_type:
7215      llm_call:
7216        model: gpt-4.1
7217    config:
7218      prompt: |
7219        Classify this email into exactly one category:
7220        {{ input.email_text }}
7221"#;
7222
7223        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7224        let options = YamlWorkflowRunOptions {
7225            telemetry: YamlWorkflowTelemetryConfig {
7226                sample_rate: 1.1,
7227                ..YamlWorkflowTelemetryConfig::default()
7228            },
7229            ..YamlWorkflowRunOptions::default()
7230        };
7231
7232        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
7233            &workflow,
7234            &json!({"email_text":"hello"}),
7235            &MockExecutor,
7236            None,
7237            None,
7238            &options,
7239        )
7240        .await
7241        .expect_err("invalid sample_rate should fail");
7242
7243        match err {
7244            YamlWorkflowRunError::InvalidInput { message } => {
7245                assert!(message.contains("telemetry.sample_rate"));
7246            }
7247            _ => panic!("expected invalid input error for sample_rate"),
7248        }
7249    }
7250
7251    #[tokio::test]
7252    async fn workflow_run_options_reject_nan_sample_rate() {
7253        let yaml = r#"
7254id: sample-rate-invalid
7255entry_node: classify
7256nodes:
7257  - id: classify
7258    node_type:
7259      llm_call:
7260        model: gpt-4.1
7261    config:
7262      prompt: |
7263        Classify this email into exactly one category:
7264        {{ input.email_text }}
7265"#;
7266
7267        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7268        let options = YamlWorkflowRunOptions {
7269            telemetry: YamlWorkflowTelemetryConfig {
7270                sample_rate: f32::NAN,
7271                ..YamlWorkflowTelemetryConfig::default()
7272            },
7273            ..YamlWorkflowRunOptions::default()
7274        };
7275
7276        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
7277            &workflow,
7278            &json!({"email_text":"hello"}),
7279            &MockExecutor,
7280            None,
7281            None,
7282            &options,
7283        )
7284        .await
7285        .expect_err("nan sample_rate should fail");
7286
7287        assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
7288    }
7289
7290    #[test]
7291    fn subworkflow_options_inherit_parent_telemetry_configuration() {
7292        let mut parent_options = YamlWorkflowRunOptions::default();
7293        parent_options.telemetry.sample_rate = 0.0;
7294        parent_options.telemetry.payload_mode = YamlWorkflowPayloadMode::RedactedPayload;
7295        parent_options.telemetry.tool_trace_mode = YamlToolTraceMode::Redacted;
7296        parent_options.trace.tenant.conversation_id = Some("conv-123".to_string());
7297
7298        let parent_context = TraceContext {
7299            trace_id: Some("trace-parent".to_string()),
7300            span_id: Some("span-parent".to_string()),
7301            parent_span_id: None,
7302            traceparent: Some("00-trace-parent-span-parent-01".to_string()),
7303            tracestate: None,
7304            baggage: BTreeMap::new(),
7305        };
7306
7307        let subworkflow_options =
7308            build_subworkflow_options(&parent_options, Some(&parent_context), None);
7309
7310        assert_eq!(subworkflow_options.telemetry.sample_rate, 0.0);
7311        assert_eq!(
7312            subworkflow_options.telemetry.payload_mode,
7313            YamlWorkflowPayloadMode::RedactedPayload
7314        );
7315        assert_eq!(
7316            subworkflow_options.telemetry.tool_trace_mode,
7317            YamlToolTraceMode::Redacted
7318        );
7319        assert_eq!(
7320            subworkflow_options.trace.tenant.conversation_id.as_deref(),
7321            Some("conv-123")
7322        );
7323        assert_eq!(
7324            subworkflow_options
7325                .trace
7326                .context
7327                .as_ref()
7328                .and_then(|ctx| ctx.trace_id.as_deref()),
7329            Some("trace-parent")
7330        );
7331    }
7332
7333    #[tokio::test]
7334    async fn workflow_run_options_use_explicit_trace_id_and_payload_mode() {
7335        let yaml = r#"
7336id: trace-options-test
7337entry_node: classify
7338nodes:
7339  - id: classify
7340    node_type:
7341      llm_call:
7342        model: gpt-4.1
7343    config:
7344      prompt: |
7345        Classify this email into exactly one category:
7346        {{ input.email_text }}
7347"#;
7348
7349        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7350        let options = YamlWorkflowRunOptions {
7351            telemetry: YamlWorkflowTelemetryConfig {
7352                payload_mode: YamlWorkflowPayloadMode::RedactedPayload,
7353                ..YamlWorkflowTelemetryConfig::default()
7354            },
7355            trace: YamlWorkflowTraceOptions {
7356                context: Some(YamlWorkflowTraceContextInput {
7357                    trace_id: Some("trace-fixed-123".to_string()),
7358                    traceparent: Some("00-trace-fixed-123-span-1-01".to_string()),
7359                    ..YamlWorkflowTraceContextInput::default()
7360                }),
7361                tenant: YamlWorkflowTraceTenantContext {
7362                    conversation_id: Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42".to_string()),
7363                    ..YamlWorkflowTraceTenantContext::default()
7364                },
7365            },
7366            model: None,
7367        };
7368
7369        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7370            &workflow,
7371            &json!({"email_text":"hello"}),
7372            &MockExecutor,
7373            None,
7374            None,
7375            &options,
7376        )
7377        .await
7378        .expect("workflow should execute");
7379
7380        assert_eq!(output.trace_id.as_deref(), Some("trace-fixed-123"));
7381        assert_eq!(
7382            output
7383                .metadata
7384                .as_ref()
7385                .and_then(|value| value.get("telemetry"))
7386                .and_then(|telemetry| telemetry.get("payload_mode"))
7387                .and_then(Value::as_str),
7388            Some("redacted_payload")
7389        );
7390        assert_eq!(
7391            output
7392                .metadata
7393                .as_ref()
7394                .and_then(|value| value.get("trace"))
7395                .and_then(|trace| trace.get("tenant"))
7396                .and_then(|tenant| tenant.get("conversation_id"))
7397                .and_then(Value::as_str),
7398            Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42")
7399        );
7400    }
7401
7402    #[test]
7403    fn streamed_payload_parser_extracts_last_json_object() {
7404        let raw = r#"{"state":"missing_scenario","reason":"ok"}
7405
7406extra reasoning text
7407
7408{"state":"ready","reason":"final"}"#;
7409
7410        let resolved = parse_streamed_structured_payload(raw, false)
7411            .expect("parser should extract final JSON object");
7412        assert_eq!(resolved.payload["state"], "ready");
7413        assert!(resolved.heal_confidence.is_none());
7414    }
7415
7416    #[test]
7417    fn streamed_payload_parser_handles_unbalanced_reasoning_before_json() {
7418        let raw = "reasoning text with unmatched { braces and thoughts\n{\"state\":\"ready\",\"reason\":\"final\"}";
7419
7420        let resolved = parse_streamed_structured_payload(raw, false)
7421            .expect("parser should recover final structured JSON object");
7422        assert_eq!(resolved.payload["state"], "ready");
7423    }
7424
7425    #[test]
7426    fn streamed_payload_parser_handles_markdown_with_heal() {
7427        let raw = r#"Some preface
7428```json
7429{
7430  "state": "missing_scenario",
7431  "reason": "Need more details"
7432}
7433```
7434Some trailing explanation"#;
7435
7436        let resolved = parse_streamed_structured_payload(raw, true)
7437            .expect("heal path should parse JSON block");
7438        assert_eq!(resolved.payload["state"], "missing_scenario");
7439        assert!(resolved.heal_confidence.is_some());
7440    }
7441
7442    #[test]
7443    fn streamed_payload_parser_errors_when_no_json_candidate_exists() {
7444        let raw = "No JSON in this streamed output";
7445        let error = parse_streamed_structured_payload(raw, false)
7446            .expect_err("strict stream parse should fail without JSON candidate");
7447        assert!(error.contains("no JSON object candidate found"));
7448    }
7449
7450    #[test]
7451    fn include_raw_stream_debug_events_defaults_to_false() {
7452        let _guard = stream_debug_env_lock()
7453            .lock()
7454            .expect("stream debug env lock should not be poisoned");
7455        std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
7456        assert!(!include_raw_stream_debug_events());
7457    }
7458
7459    #[test]
7460    fn include_raw_stream_debug_events_accepts_truthy_values() {
7461        let _guard = stream_debug_env_lock()
7462            .lock()
7463            .expect("stream debug env lock should not be poisoned");
7464        std::env::set_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW", "true");
7465        assert!(include_raw_stream_debug_events());
7466        std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
7467    }
7468
7469    #[test]
7470    fn structured_json_delta_filter_strips_reasoning_prefix_and_suffix() {
7471        let mut filter = StructuredJsonDeltaFilter::default();
7472        let chunks = vec![
7473            "I will think first... ",
7474            "{\"state\":\"missing_scenario\",",
7475            "\"reason\":\"Need more details\"}",
7476            " additional commentary",
7477        ];
7478
7479        let filtered = chunks
7480            .into_iter()
7481            .filter_map(|chunk| filter.split(chunk).0)
7482            .collect::<String>();
7483
7484        assert_eq!(
7485            filtered,
7486            "{\"state\":\"missing_scenario\",\"reason\":\"Need more details\"}"
7487        );
7488    }
7489
7490    #[test]
7491    fn structured_json_delta_filter_handles_braces_inside_strings() {
7492        let mut filter = StructuredJsonDeltaFilter::default();
7493        let chunks = vec![
7494            "preface ",
7495            "{\"reason\":\"brace } in text\",\"state\":\"ok\"}",
7496            " trailing",
7497        ];
7498
7499        let filtered = chunks
7500            .into_iter()
7501            .filter_map(|chunk| filter.split(chunk).0)
7502            .collect::<String>();
7503
7504        assert_eq!(
7505            filtered,
7506            "{\"reason\":\"brace } in text\",\"state\":\"ok\"}"
7507        );
7508    }
7509
7510    #[test]
7511    fn render_json_object_as_text_converts_top_level_fields() {
7512        let rendered =
7513            render_json_object_as_text(r#"{"question":"q","confidence":0.8,"nested":{"a":1}}"#);
7514        let lines: std::collections::HashSet<&str> = rendered.lines().collect();
7515
7516        assert_eq!(lines.len(), 3);
7517        assert!(lines.contains("question: q"));
7518        assert!(lines.contains("confidence: 0.8"));
7519        assert!(lines.contains("nested: {\"a\":1}"));
7520    }
7521
7522    #[test]
7523    fn stream_json_as_text_formatter_emits_once_when_complete() {
7524        let mut formatter = StreamJsonAsTextFormatter::default();
7525        formatter.push("{\"question\":\"hello\"}");
7526
7527        let first = formatter.emit_if_ready(true);
7528        let second = formatter.emit_if_ready(true);
7529
7530        assert_eq!(first, Some("question: hello".to_string()));
7531        assert_eq!(second, None);
7532    }
7533
7534    #[test]
7535    fn rewrite_yaml_condition_preserves_output_prefix_in_field_names() {
7536        let expr = "$.nodes.classify.output.output_total == 1";
7537        let rewritten = rewrite_yaml_condition_to_ir(expr);
7538        assert_eq!(rewritten, "$.node_outputs.classify.output_total == 1");
7539    }
7540
7541    #[tokio::test]
7542    async fn validates_workflow_input_before_ir_runtime_path() {
7543        let yaml = r#"
7544id: chat-workflow
7545entry_node: classify
7546nodes:
7547  - id: classify
7548    node_type:
7549      llm_call:
7550        model: gpt-4.1
7551    config:
7552      prompt: |
7553        classify
7554"#;
7555
7556        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7557        let err = run_workflow_yaml(&workflow, &json!("not-an-object"), &MockExecutor)
7558            .await
7559            .expect_err("non-object input should fail before execution");
7560
7561        assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
7562    }
7563
7564    #[test]
7565    fn interpolate_template_supports_dollar_prefixed_paths() {
7566        let context = json!({
7567            "input": {
7568                "email_text": "hello"
7569            }
7570        });
7571
7572        let rendered = interpolate_template("value={{ $.input.email_text }}", &context);
7573        assert_eq!(rendered, "value=hello");
7574    }
7575}