Skip to main content

simple_agents_workflow/
yaml_runner.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::Path;
3use std::time::Instant;
4
5use async_trait::async_trait;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use simple_agent_type::message::{Message, Role};
10use simple_agent_type::request::CompletionRequest;
11use simple_agent_type::response::FinishReason;
12use simple_agent_type::tool::{
13    ToolCall, ToolChoice, ToolChoiceFunction, ToolChoiceMode, ToolChoiceTool, ToolDefinition,
14    ToolFunction, ToolType,
15};
16use simple_agents_core::{
17    CompletionMode, CompletionOptions, CompletionOutcome, SimpleAgentsClient,
18};
19use simple_agents_healing::JsonishParser;
20use thiserror::Error;
21
22use crate::ir::{Node, NodeKind, RouterRoute, WorkflowDefinition, WORKFLOW_IR_V0};
23use crate::observability::tracing::{
24    flush_workflow_tracer, workflow_tracer, SpanKind, TraceContext, WorkflowSpan,
25};
26use crate::runtime::{
27    LlmExecutionError, LlmExecutionInput, LlmExecutionOutput, LlmExecutor, ToolExecutionError,
28    ToolExecutionInput, ToolExecutor, WorkflowRuntime, WorkflowRuntimeError,
29    WorkflowRuntimeOptions,
30};
31use crate::visualize::workflow_to_mermaid;
32
33const YAML_START_NODE_ID: &str = "__yaml_start";
34const YAML_LLM_TOOL_ID: &str = "__yaml_llm_call";
35
36static TRACE_ID_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
37
38#[derive(Debug, Clone, PartialEq, Serialize)]
39pub struct YamlStepTiming {
40    pub node_id: String,
41    pub node_kind: String,
42    pub elapsed_ms: u128,
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub prompt_tokens: Option<u32>,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub completion_tokens: Option<u32>,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub total_tokens: Option<u32>,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub thinking_tokens: Option<u32>,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub tokens_per_second: Option<f64>,
53}
54
55#[derive(Debug, Clone, PartialEq, Serialize)]
56pub struct YamlLlmNodeMetrics {
57    pub elapsed_ms: u128,
58    pub prompt_tokens: u32,
59    pub completion_tokens: u32,
60    pub total_tokens: u32,
61    #[serde(skip_serializing_if = "Option::is_none")]
62    pub thinking_tokens: Option<u32>,
63    pub tokens_per_second: f64,
64}
65
66#[derive(Debug, Clone, PartialEq, Serialize)]
67pub struct YamlWorkflowRunOutput {
68    pub workflow_id: String,
69    pub entry_node: String,
70    pub email_text: String,
71    pub trace: Vec<String>,
72    pub outputs: BTreeMap<String, Value>,
73    pub terminal_node: String,
74    pub terminal_output: Option<Value>,
75    pub step_timings: Vec<YamlStepTiming>,
76    pub llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics>,
77    pub total_elapsed_ms: u128,
78    #[serde(skip_serializing_if = "Option::is_none")]
79    pub ttft_ms: Option<u128>,
80    pub total_input_tokens: u64,
81    pub total_output_tokens: u64,
82    pub total_tokens: u64,
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub total_thinking_tokens: Option<u64>,
85    pub tokens_per_second: f64,
86    #[serde(skip_serializing_if = "Option::is_none")]
87    pub trace_id: Option<String>,
88    #[serde(skip_serializing_if = "Option::is_none")]
89    pub metadata: Option<Value>,
90}
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
93#[serde(rename_all = "snake_case")]
94pub enum YamlWorkflowPayloadMode {
95    #[default]
96    FullPayload,
97    RedactedPayload,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
101#[serde(rename_all = "snake_case")]
102pub enum YamlToolTraceMode {
103    #[default]
104    Full,
105    Redacted,
106    Off,
107}
108
109#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
110pub struct YamlWorkflowTraceContextInput {
111    #[serde(default)]
112    pub trace_id: Option<String>,
113    #[serde(default)]
114    pub span_id: Option<String>,
115    #[serde(default)]
116    pub parent_span_id: Option<String>,
117    #[serde(default)]
118    pub traceparent: Option<String>,
119    #[serde(default)]
120    pub tracestate: Option<String>,
121    #[serde(default)]
122    pub baggage: BTreeMap<String, String>,
123}
124
125#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
126pub struct YamlWorkflowTraceTenantContext {
127    #[serde(default)]
128    pub workspace_id: Option<String>,
129    #[serde(default)]
130    pub user_id: Option<String>,
131    #[serde(default)]
132    pub conversation_id: Option<String>,
133    #[serde(default)]
134    pub request_id: Option<String>,
135    #[serde(default)]
136    pub run_id: Option<String>,
137}
138
139#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
140pub struct YamlWorkflowTelemetryConfig {
141    #[serde(default = "default_true")]
142    pub enabled: bool,
143    #[serde(default = "default_true")]
144    pub nerdstats: bool,
145    #[serde(default = "default_sample_rate")]
146    pub sample_rate: f32,
147    #[serde(default)]
148    pub payload_mode: YamlWorkflowPayloadMode,
149    #[serde(default = "default_retention_days")]
150    pub retention_days: u32,
151    #[serde(default = "default_true")]
152    pub multi_tenant: bool,
153    #[serde(default)]
154    pub tool_trace_mode: YamlToolTraceMode,
155}
156
157impl Default for YamlWorkflowTelemetryConfig {
158    fn default() -> Self {
159        Self {
160            enabled: true,
161            nerdstats: true,
162            sample_rate: 1.0,
163            payload_mode: YamlWorkflowPayloadMode::FullPayload,
164            retention_days: 30,
165            multi_tenant: true,
166            tool_trace_mode: YamlToolTraceMode::Full,
167        }
168    }
169}
170
171#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
172pub struct YamlWorkflowTraceOptions {
173    #[serde(default)]
174    pub context: Option<YamlWorkflowTraceContextInput>,
175    #[serde(default)]
176    pub tenant: YamlWorkflowTraceTenantContext,
177}
178
179#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
180pub struct YamlWorkflowRunOptions {
181    #[serde(default)]
182    pub telemetry: YamlWorkflowTelemetryConfig,
183    #[serde(default)]
184    pub trace: YamlWorkflowTraceOptions,
185    #[serde(default)]
186    pub model: Option<String>,
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
190pub struct YamlLlmTokenUsage {
191    pub prompt_tokens: u32,
192    pub completion_tokens: u32,
193    pub total_tokens: u32,
194    pub thinking_tokens: Option<u32>,
195}
196
197#[derive(Debug, Clone, PartialEq, Serialize)]
198pub struct YamlLlmExecutionResult {
199    pub payload: Value,
200    pub usage: Option<YamlLlmTokenUsage>,
201    pub ttft_ms: Option<u128>,
202    pub tool_calls: Vec<YamlToolCallTrace>,
203}
204
205#[derive(Debug, Clone, PartialEq, Serialize)]
206pub struct YamlToolCallTrace {
207    pub id: String,
208    pub name: String,
209    pub arguments: Value,
210    pub output: Option<Value>,
211    pub status: String,
212    pub elapsed_ms: u128,
213    pub error: Option<String>,
214}
215
216#[derive(Debug, Clone, Default)]
217struct YamlTokenTotals {
218    input_tokens: u64,
219    output_tokens: u64,
220    total_tokens: u64,
221    thinking_tokens: Option<u64>,
222}
223
224impl YamlTokenTotals {
225    fn add_usage(&mut self, usage: &YamlLlmTokenUsage) {
226        self.input_tokens += u64::from(usage.prompt_tokens);
227        self.output_tokens += u64::from(usage.completion_tokens);
228        self.total_tokens += u64::from(usage.total_tokens);
229
230        if let Some(thinking_tokens) = usage.thinking_tokens {
231            let next = self.thinking_tokens.unwrap_or(0) + u64::from(thinking_tokens);
232            self.thinking_tokens = Some(next);
233        }
234    }
235
236    fn tokens_per_second(&self, elapsed_ms: u128) -> f64 {
237        if elapsed_ms == 0 {
238            return 0.0;
239        }
240        round_two_decimals((self.output_tokens as f64) * 1000.0 / (elapsed_ms as f64))
241    }
242}
243
244fn round_two_decimals(value: f64) -> f64 {
245    (value * 100.0).round() / 100.0
246}
247
248fn completion_tokens_per_second(completion_tokens: u32, elapsed_ms: u128) -> f64 {
249    if elapsed_ms == 0 {
250        return 0.0;
251    }
252    round_two_decimals((completion_tokens as f64) * 1000.0 / (elapsed_ms as f64))
253}
254
255fn default_true() -> bool {
256    true
257}
258
259fn default_sample_rate() -> f32 {
260    1.0
261}
262
263fn default_retention_days() -> u32 {
264    30
265}
266
267fn validate_sample_rate(sample_rate: f32) -> Result<(), YamlWorkflowRunError> {
268    if sample_rate.is_finite() && (0.0..=1.0).contains(&sample_rate) {
269        return Ok(());
270    }
271
272    Err(YamlWorkflowRunError::InvalidInput {
273        message: format!(
274            "telemetry.sample_rate must be between 0.0 and 1.0 inclusive; received {sample_rate}"
275        ),
276    })
277}
278
279fn should_sample_trace(trace_id: &str, sample_rate: f32) -> bool {
280    if sample_rate >= 1.0 {
281        return true;
282    }
283    if sample_rate <= 0.0 {
284        return false;
285    }
286
287    let mut hash: u64 = 0xcbf29ce484222325;
288    for byte in trace_id.as_bytes() {
289        hash ^= u64::from(*byte);
290        hash = hash.wrapping_mul(0x100000001b3);
291    }
292
293    let ratio = (hash as f64) / (u64::MAX as f64);
294    ratio < (sample_rate as f64)
295}
296
297fn trace_id_from_traceparent(traceparent: &str) -> Option<String> {
298    let mut parts = traceparent.split('-');
299    let version = parts.next()?;
300    let trace_id = parts.next()?;
301    let _span_id = parts.next()?;
302    let _flags = parts.next()?;
303    if parts.next().is_some() {
304        return None;
305    }
306
307    if version.len() != 2 || trace_id.len() != 32 {
308        return None;
309    }
310
311    if !version.chars().all(|ch| ch.is_ascii_hexdigit()) {
312        return None;
313    }
314    if !trace_id.chars().all(|ch| ch.is_ascii_hexdigit()) {
315        return None;
316    }
317    if trace_id.chars().all(|ch| ch == '0') {
318        return None;
319    }
320
321    Some(trace_id.to_ascii_lowercase())
322}
323
324#[derive(Debug, Clone, Copy, PartialEq, Eq)]
325enum TraceIdSource {
326    Disabled,
327    ExplicitTraceId,
328    ParentTraceId,
329    Traceparent,
330    ParentTraceparent,
331    Generated,
332}
333
334#[derive(Debug, Clone)]
335struct ResolvedTelemetryContext {
336    trace_id: Option<String>,
337    sampled: bool,
338    trace_id_source: TraceIdSource,
339}
340
341fn resolve_run_trace_id_with_source(
342    options: &YamlWorkflowRunOptions,
343    parent_trace_context: Option<&TraceContext>,
344) -> (Option<String>, TraceIdSource) {
345    if !options.telemetry.enabled {
346        return (None, TraceIdSource::Disabled);
347    }
348
349    if let Some(trace_id) = options
350        .trace
351        .context
352        .as_ref()
353        .and_then(|context| context.trace_id.clone())
354    {
355        return (Some(trace_id), TraceIdSource::ExplicitTraceId);
356    }
357
358    if let Some(trace_id) = parent_trace_context.and_then(|context| context.trace_id.clone()) {
359        return (Some(trace_id), TraceIdSource::ParentTraceId);
360    }
361
362    if let Some(trace_id) = options
363        .trace
364        .context
365        .as_ref()
366        .and_then(|context| context.traceparent.as_deref())
367        .and_then(trace_id_from_traceparent)
368    {
369        return (Some(trace_id), TraceIdSource::Traceparent);
370    }
371
372    if let Some(trace_id) = parent_trace_context
373        .and_then(|context| context.traceparent.as_deref())
374        .and_then(trace_id_from_traceparent)
375    {
376        return (Some(trace_id), TraceIdSource::ParentTraceparent);
377    }
378
379    (Some(generate_trace_id()), TraceIdSource::Generated)
380}
381
382fn resolve_telemetry_context(
383    options: &YamlWorkflowRunOptions,
384    parent_trace_context: Option<&TraceContext>,
385) -> ResolvedTelemetryContext {
386    let (trace_id, trace_id_source) =
387        resolve_run_trace_id_with_source(options, parent_trace_context);
388    let sampled = trace_id
389        .as_deref()
390        .map(|value| should_sample_trace(value, options.telemetry.sample_rate))
391        .unwrap_or(false);
392
393    ResolvedTelemetryContext {
394        trace_id,
395        sampled,
396        trace_id_source,
397    }
398}
399
400fn generate_trace_id() -> String {
401    use std::time::{SystemTime, UNIX_EPOCH};
402
403    let now_nanos = SystemTime::now()
404        .duration_since(UNIX_EPOCH)
405        .map(|duration| duration.as_nanos())
406        .unwrap_or(0);
407    let sequence = u128::from(TRACE_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed));
408    format!("{:032x}", now_nanos ^ sequence)
409}
410
411fn workflow_metadata_with_trace(
412    options: &YamlWorkflowRunOptions,
413    trace_id: &str,
414    sampled: bool,
415    trace_id_source: TraceIdSource,
416) -> Value {
417    json!({
418        "telemetry": {
419            "trace_id": trace_id,
420            "trace_id_source": match trace_id_source {
421                TraceIdSource::Disabled => "disabled",
422                TraceIdSource::ExplicitTraceId => "explicit_trace_id",
423                TraceIdSource::ParentTraceId => "parent_trace_id",
424                TraceIdSource::Traceparent => "traceparent",
425                TraceIdSource::ParentTraceparent => "parent_traceparent",
426                TraceIdSource::Generated => "generated",
427            },
428            "enabled": options.telemetry.enabled,
429            "sampled": sampled,
430            "nerdstats": options.telemetry.nerdstats,
431            "sample_rate": options.telemetry.sample_rate,
432            "payload_mode": match options.telemetry.payload_mode {
433                YamlWorkflowPayloadMode::FullPayload => "full_payload",
434                YamlWorkflowPayloadMode::RedactedPayload => "redacted_payload",
435            },
436            "retention_days": options.telemetry.retention_days,
437            "multi_tenant": options.telemetry.multi_tenant,
438            "tool_trace_mode": match options.telemetry.tool_trace_mode {
439                YamlToolTraceMode::Full => "full",
440                YamlToolTraceMode::Redacted => "redacted",
441                YamlToolTraceMode::Off => "off",
442            },
443        },
444        "trace": {
445            "tenant": {
446                "workspace_id": options.trace.tenant.workspace_id,
447                "user_id": options.trace.tenant.user_id,
448                "conversation_id": options.trace.tenant.conversation_id,
449                "request_id": options.trace.tenant.request_id,
450                "run_id": options.trace.tenant.run_id,
451            }
452        },
453    })
454}
455
456fn apply_trace_tenant_attributes(span: &mut dyn WorkflowSpan, options: &YamlWorkflowRunOptions) {
457    if let Some(workspace_id) = options.trace.tenant.workspace_id.as_deref() {
458        span.set_attribute("tenant.workspace_id", workspace_id);
459    }
460    if let Some(user_id) = options.trace.tenant.user_id.as_deref() {
461        span.set_attribute("tenant.user_id", user_id);
462    }
463    if let Some(conversation_id) = options.trace.tenant.conversation_id.as_deref() {
464        span.set_attribute("tenant.conversation_id", conversation_id);
465    }
466    if let Some(request_id) = options.trace.tenant.request_id.as_deref() {
467        span.set_attribute("tenant.request_id", request_id);
468    }
469    if let Some(run_id) = options.trace.tenant.run_id.as_deref() {
470        span.set_attribute("tenant.run_id", run_id);
471    }
472}
473
474fn workflow_nerdstats(output: &YamlWorkflowRunOutput) -> Value {
475    let llm_nodes_without_usage: Vec<String> = output
476        .step_timings
477        .iter()
478        .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
479        .map(|step| step.node_id.clone())
480        .collect();
481    let token_metrics_available = llm_nodes_without_usage.is_empty();
482    let token_metrics_source = if token_metrics_available {
483        "provider_usage"
484    } else {
485        "provider_stream_usage_unavailable"
486    };
487    let total_input_tokens = if token_metrics_available {
488        json!(output.total_input_tokens)
489    } else {
490        Value::Null
491    };
492    let total_output_tokens = if token_metrics_available {
493        json!(output.total_output_tokens)
494    } else {
495        Value::Null
496    };
497    let total_tokens = if token_metrics_available {
498        json!(output.total_tokens)
499    } else {
500        Value::Null
501    };
502    let total_thinking_tokens = if token_metrics_available {
503        json!(output.total_thinking_tokens)
504    } else {
505        Value::Null
506    };
507    let tokens_per_second = if token_metrics_available {
508        json!(output.tokens_per_second)
509    } else {
510        Value::Null
511    };
512
513    json!({
514        "workflow_id": output.workflow_id,
515        "terminal_node": output.terminal_node,
516        "total_elapsed_ms": output.total_elapsed_ms,
517        "ttft_ms": output.ttft_ms,
518        "step_timings": output.step_timings,
519        "llm_node_metrics": output.llm_node_metrics,
520        "total_input_tokens": total_input_tokens,
521        "total_output_tokens": total_output_tokens,
522        "total_tokens": total_tokens,
523        "total_thinking_tokens": total_thinking_tokens,
524        "tokens_per_second": tokens_per_second,
525        "trace_id": output.trace_id,
526        "token_metrics_available": token_metrics_available,
527        "token_metrics_source": token_metrics_source,
528        "llm_nodes_without_usage": llm_nodes_without_usage,
529    })
530}
531
532fn payload_for_span(mode: YamlWorkflowPayloadMode, payload: &Value) -> String {
533    match mode {
534        YamlWorkflowPayloadMode::FullPayload => payload.to_string(),
535        YamlWorkflowPayloadMode::RedactedPayload => json!({
536            "redacted": true,
537            "value_type": match payload {
538                Value::Null => "null",
539                Value::Bool(_) => "bool",
540                Value::Number(_) => "number",
541                Value::String(_) => "string",
542                Value::Array(_) => "array",
543                Value::Object(_) => "object",
544            }
545        })
546        .to_string(),
547    }
548}
549
550fn payload_for_tool_trace(mode: YamlToolTraceMode, payload: &Value) -> Value {
551    match mode {
552        YamlToolTraceMode::Full => payload.clone(),
553        YamlToolTraceMode::Redacted => json!({
554            "redacted": true,
555            "value_type": json_type_name(payload),
556        }),
557        YamlToolTraceMode::Off => Value::Null,
558    }
559}
560
561fn validate_json_schema(schema: &Value) -> Result<(), String> {
562    jsonschema::JSONSchema::compile(schema)
563        .map(|_| ())
564        .map_err(|error| format!("invalid JSON schema: {error}"))
565}
566
567fn validate_schema_instance(schema: &Value, instance: &Value) -> Result<(), String> {
568    let validator = jsonschema::JSONSchema::compile(schema)
569        .map_err(|error| format!("invalid JSON schema: {error}"))?;
570    if let Err(errors) = validator.validate(instance) {
571        let message = errors
572            .into_iter()
573            .next()
574            .map(|error| error.to_string())
575            .unwrap_or_else(|| "unknown schema validation error".to_string());
576        return Err(format!("schema validation failed: {message}"));
577    }
578    Ok(())
579}
580
581fn schema_type(schema: &Value) -> Option<&str> {
582    schema.get("type").and_then(Value::as_str)
583}
584
585fn schema_expects_object(schema: &Value) -> bool {
586    schema_type(schema) == Some("object")
587}
588
589fn trace_context_from_options(options: &YamlWorkflowRunOptions) -> Option<TraceContext> {
590    options.trace.context.as_ref().map(|input| TraceContext {
591        trace_id: input.trace_id.clone(),
592        span_id: input.span_id.clone(),
593        parent_span_id: input.parent_span_id.clone(),
594        traceparent: input.traceparent.clone(),
595        tracestate: input.tracestate.clone(),
596        baggage: input.baggage.clone(),
597    })
598}
599
600fn merged_trace_context_for_worker(
601    span_context: Option<&TraceContext>,
602    resolved_trace_id: Option<&str>,
603    options: &YamlWorkflowRunOptions,
604) -> TraceContext {
605    let input_context = options.trace.context.as_ref();
606    let baggage = if let Some(context) = span_context {
607        if !context.baggage.is_empty() {
608            context.baggage.clone()
609        } else {
610            input_context
611                .map(|value| value.baggage.clone())
612                .unwrap_or_default()
613        }
614    } else {
615        input_context
616            .map(|value| value.baggage.clone())
617            .unwrap_or_default()
618    };
619
620    TraceContext {
621        trace_id: span_context
622            .and_then(|context| context.trace_id.clone())
623            .or_else(|| resolved_trace_id.map(|value| value.to_string()))
624            .or_else(|| input_context.and_then(|context| context.trace_id.clone())),
625        span_id: span_context
626            .and_then(|context| context.span_id.clone())
627            .or_else(|| input_context.and_then(|context| context.span_id.clone())),
628        parent_span_id: span_context
629            .and_then(|context| context.parent_span_id.clone())
630            .or_else(|| input_context.and_then(|context| context.parent_span_id.clone())),
631        traceparent: span_context
632            .and_then(|context| context.traceparent.clone())
633            .or_else(|| input_context.and_then(|context| context.traceparent.clone())),
634        tracestate: span_context
635            .and_then(|context| context.tracestate.clone())
636            .or_else(|| input_context.and_then(|context| context.tracestate.clone())),
637        baggage,
638    }
639}
640
641fn custom_worker_context_with_trace(
642    context: &Value,
643    trace_context: &TraceContext,
644    tenant_context: &YamlWorkflowTraceTenantContext,
645) -> Value {
646    let mut context_with_trace = context.clone();
647    let Some(root) = context_with_trace.as_object_mut() else {
648        return context_with_trace;
649    };
650
651    root.insert(
652        "trace".to_string(),
653        json!({
654            "context": {
655                "trace_id": trace_context.trace_id,
656                "span_id": trace_context.span_id,
657                "parent_span_id": trace_context.parent_span_id,
658                "traceparent": trace_context.traceparent,
659                "tracestate": trace_context.tracestate,
660                "baggage": trace_context.baggage,
661            },
662            "tenant": {
663                "workspace_id": tenant_context.workspace_id,
664                "user_id": tenant_context.user_id,
665                "conversation_id": tenant_context.conversation_id,
666                "request_id": tenant_context.request_id,
667                "run_id": tenant_context.run_id,
668            }
669        }),
670    );
671
672    context_with_trace
673}
674
675fn include_raw_stream_debug_events() -> bool {
676    match std::env::var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW") {
677        Ok(value) => {
678            let normalized = value.trim().to_ascii_lowercase();
679            normalized == "1" || normalized == "true" || normalized == "yes" || normalized == "on"
680        }
681        Err(_) => false,
682    }
683}
684
685#[derive(Debug)]
686struct StreamedPayloadResolution {
687    payload: Value,
688    heal_confidence: Option<f32>,
689}
690
691#[derive(Debug, Default)]
692struct StreamJsonAsTextFormatter {
693    raw_json: String,
694    emitted: bool,
695}
696
697impl StreamJsonAsTextFormatter {
698    fn push(&mut self, chunk: &str) {
699        self.raw_json.push_str(chunk);
700    }
701
702    fn emit_if_ready(&mut self, complete: bool) -> Option<String> {
703        if self.emitted || !complete {
704            return None;
705        }
706        self.emitted = true;
707        Some(render_json_object_as_text(self.raw_json.as_str()))
708    }
709}
710
711fn render_json_object_as_text(raw_json: &str) -> String {
712    let value = match serde_json::from_str::<Value>(raw_json) {
713        Ok(value) => value,
714        Err(_) => return raw_json.to_string(),
715    };
716    let Some(object) = value.as_object() else {
717        return raw_json.to_string();
718    };
719
720    let mut lines = Vec::with_capacity(object.len());
721    for (key, value) in object {
722        let rendered = match value {
723            Value::String(text) => text.clone(),
724            _ => value.to_string(),
725        };
726        lines.push(format!("{key}: {rendered}"));
727    }
728    lines.join("\n")
729}
730
731#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
732#[serde(rename_all = "snake_case")]
733pub enum YamlWorkflowTokenKind {
734    Output,
735    Thinking,
736}
737
738#[derive(Debug, Default)]
739struct StructuredJsonDeltaFilter {
740    started: bool,
741    completed: bool,
742    depth: u32,
743    in_string: bool,
744    escape: bool,
745}
746
747impl StructuredJsonDeltaFilter {
748    fn split(&mut self, delta: &str) -> (Option<String>, Option<String>) {
749        if delta.is_empty() {
750            return (None, None);
751        }
752
753        let mut output = String::new();
754        let mut thinking = String::new();
755
756        for ch in delta.chars() {
757            if self.completed {
758                thinking.push(ch);
759                continue;
760            }
761
762            if !self.started {
763                if ch != '{' {
764                    thinking.push(ch);
765                    continue;
766                }
767                self.started = true;
768                self.depth = 1;
769                output.push(ch);
770                continue;
771            }
772
773            output.push(ch);
774            if self.in_string {
775                if self.escape {
776                    self.escape = false;
777                    continue;
778                }
779                if ch == '\\' {
780                    self.escape = true;
781                    continue;
782                }
783                if ch == '"' {
784                    self.in_string = false;
785                }
786                continue;
787            }
788
789            match ch {
790                '"' => self.in_string = true,
791                '{' => self.depth = self.depth.saturating_add(1),
792                '}' => {
793                    if self.depth > 0 {
794                        self.depth -= 1;
795                    }
796                    if self.depth == 0 {
797                        self.completed = true;
798                    }
799                }
800                _ => {}
801            }
802        }
803
804        let output = if output.is_empty() {
805            None
806        } else {
807            Some(output)
808        };
809        let thinking = if thinking.is_empty() {
810            None
811        } else {
812            Some(thinking)
813        };
814
815        (output, thinking)
816    }
817
818    fn completed(&self) -> bool {
819        self.completed
820    }
821}
822
823fn extract_last_fenced_json_block(raw: &str) -> Option<&str> {
824    let start = raw.rfind("```json")?;
825    let remainder = &raw[start + "```json".len()..];
826    let end = remainder.find("```")?;
827    let candidate = remainder[..end].trim();
828    if candidate.is_empty() {
829        return None;
830    }
831    Some(candidate)
832}
833
834fn extract_balanced_object_from(raw: &str, start_index: usize) -> Option<&str> {
835    let mut depth = 0u32;
836    let mut in_string = false;
837    let mut escape = false;
838
839    for (relative_index, ch) in raw[start_index..].char_indices() {
840        if in_string {
841            if escape {
842                escape = false;
843                continue;
844            }
845            if ch == '\\' {
846                escape = true;
847                continue;
848            }
849            if ch == '"' {
850                in_string = false;
851            }
852            continue;
853        }
854
855        match ch {
856            '"' => in_string = true,
857            '{' => depth = depth.saturating_add(1),
858            '}' => {
859                if depth == 0 {
860                    return None;
861                }
862                depth -= 1;
863                if depth == 0 {
864                    let end_index = start_index + relative_index + ch.len_utf8();
865                    return Some(raw[start_index..end_index].trim());
866                }
867            }
868            _ => {}
869        }
870    }
871
872    None
873}
874
875fn extract_last_parsable_object(raw: &str) -> Option<&str> {
876    let starts: Vec<usize> = raw
877        .char_indices()
878        .filter_map(|(index, ch)| if ch == '{' { Some(index) } else { None })
879        .collect();
880
881    for start in starts.into_iter().rev() {
882        let Some(candidate) = extract_balanced_object_from(raw, start) else {
883            continue;
884        };
885        if serde_json::from_str::<Value>(candidate).is_ok() {
886            return Some(candidate);
887        }
888    }
889
890    None
891}
892
893fn resolve_structured_json_candidate(raw: &str) -> Option<&str> {
894    extract_last_fenced_json_block(raw).or_else(|| extract_last_parsable_object(raw))
895}
896
897fn parse_streamed_structured_payload(
898    raw: &str,
899    heal: bool,
900) -> Result<StreamedPayloadResolution, String> {
901    if !heal {
902        if let Ok(payload) = serde_json::from_str::<Value>(raw) {
903            return Ok(StreamedPayloadResolution {
904                payload,
905                heal_confidence: None,
906            });
907        }
908
909        let candidate = resolve_structured_json_candidate(raw).ok_or_else(|| {
910            "failed to parse streamed structured completion JSON: no JSON object candidate found"
911                .to_string()
912        })?;
913        let payload = serde_json::from_str::<Value>(candidate).map_err(|error| {
914            format!(
915                "failed to parse streamed structured completion JSON: {error}; candidate={candidate}"
916            )
917        })?;
918        return Ok(StreamedPayloadResolution {
919            payload,
920            heal_confidence: None,
921        });
922    }
923
924    let candidate = resolve_structured_json_candidate(raw).unwrap_or(raw);
925    let parser = JsonishParser::new();
926    let healed = parser
927        .parse(candidate)
928        .map_err(|error| format!("failed to heal streamed structured completion JSON: {error}"))?;
929
930    Ok(StreamedPayloadResolution {
931        payload: healed.value,
932        heal_confidence: Some(healed.confidence),
933    })
934}
935
936#[derive(Debug, Clone, PartialEq, Serialize)]
937pub struct YamlWorkflowEvent {
938    pub event_type: String,
939    pub node_id: Option<String>,
940    pub step_id: Option<String>,
941    pub node_kind: Option<String>,
942    pub streamable: Option<bool>,
943    pub message: Option<String>,
944    pub delta: Option<String>,
945    pub token_kind: Option<YamlWorkflowTokenKind>,
946    pub is_terminal_node_token: Option<bool>,
947    pub elapsed_ms: Option<u128>,
948    pub metadata: Option<Value>,
949}
950
951pub type WorkflowMessageRole = Role;
952
953#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
954pub struct WorkflowMessage {
955    pub role: WorkflowMessageRole,
956    pub content: String,
957    #[serde(default)]
958    pub name: Option<String>,
959    #[serde(default, alias = "toolCallId")]
960    pub tool_call_id: Option<String>,
961}
962
963#[derive(Debug, Clone, PartialEq, Serialize)]
964pub struct YamlTemplateBinding {
965    pub index: usize,
966    pub expression: String,
967    pub source_path: String,
968    pub resolved: Value,
969    pub resolved_type: String,
970    pub missing: bool,
971}
972
973#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
974pub enum YamlWorkflowDiagnosticSeverity {
975    Error,
976    Warning,
977}
978
979#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
980pub struct YamlWorkflowDiagnostic {
981    pub node_id: Option<String>,
982    pub code: String,
983    pub severity: YamlWorkflowDiagnosticSeverity,
984    pub message: String,
985}
986
987#[derive(Debug, Error)]
988pub enum YamlWorkflowRunError {
989    #[error("failed to read workflow yaml '{path}': {source}")]
990    Read {
991        path: String,
992        source: std::io::Error,
993    },
994    #[error("failed to parse workflow yaml '{path}': {source}")]
995    Parse {
996        path: String,
997        source: serde_yaml::Error,
998    },
999    #[error("workflow '{workflow_id}' has no nodes")]
1000    EmptyNodes { workflow_id: String },
1001    #[error("entry node '{entry_node}' does not exist")]
1002    MissingEntry { entry_node: String },
1003    #[error("unknown node id '{node_id}'")]
1004    MissingNode { node_id: String },
1005    #[error("unsupported node type in '{node_id}'")]
1006    UnsupportedNodeType { node_id: String },
1007    #[error("unsupported switch condition format: {condition}")]
1008    UnsupportedCondition { condition: String },
1009    #[error("switch node '{node_id}' has no valid next target")]
1010    InvalidSwitchTarget { node_id: String },
1011    #[error("llm returned non-object payload for node '{node_id}'")]
1012    LlmPayloadNotObject { node_id: String },
1013    #[error("custom worker handler '{handler}' is not supported")]
1014    UnsupportedCustomHandler { handler: String },
1015    #[error("llm execution failed for node '{node_id}': {message}")]
1016    Llm { node_id: String, message: String },
1017    #[error("custom worker execution failed for node '{node_id}': {message}")]
1018    CustomWorker { node_id: String, message: String },
1019    #[error("workflow validation failed with {diagnostics_count} error(s)")]
1020    Validation {
1021        diagnostics_count: usize,
1022        diagnostics: Vec<YamlWorkflowDiagnostic>,
1023    },
1024    #[error("invalid workflow input: {message}")]
1025    InvalidInput { message: String },
1026    #[error("ir runtime execution failed: {message}")]
1027    IrRuntime { message: String },
1028    #[error("workflow event stream cancelled: {message}")]
1029    EventSinkCancelled { message: String },
1030}
1031
1032pub trait YamlWorkflowEventSink: Send + Sync {
1033    fn emit(&self, event: &YamlWorkflowEvent);
1034
1035    fn is_cancelled(&self) -> bool {
1036        false
1037    }
1038}
1039
1040pub struct NoopYamlWorkflowEventSink;
1041
1042impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
1043    fn emit(&self, _event: &YamlWorkflowEvent) {}
1044}
1045
1046fn workflow_event_sink_cancelled_message() -> &'static str {
1047    "workflow event callback cancelled"
1048}
1049
1050fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
1051    event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
1052}
1053
1054#[derive(Debug, Clone, PartialEq, Eq, Error)]
1055pub enum YamlToIrError {
1056    #[error("entry node '{entry_node}' does not exist")]
1057    MissingEntry { entry_node: String },
1058    #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
1059    MultipleOutgoingEdge { node_id: String },
1060    #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
1061    UnsupportedNode { node_id: String, reason: String },
1062}
1063
1064/// Render a YAML workflow graph as Mermaid flowchart.
1065pub fn yaml_workflow_to_mermaid(workflow: &YamlWorkflow) -> String {
1066    if let Ok(ir) = yaml_workflow_to_ir(workflow) {
1067        return workflow_to_mermaid(&ir);
1068    }
1069
1070    yaml_workflow_to_mermaid_fallback(workflow)
1071}
1072
1073fn yaml_workflow_to_mermaid_fallback(workflow: &YamlWorkflow) -> String {
1074    let mut lines = Vec::new();
1075    lines.push("flowchart TD".to_string());
1076    let mut tool_node_ids: Vec<String> = Vec::new();
1077
1078    for node in &workflow.nodes {
1079        let label = format!("{}\\n({})", node.id, node.kind_name());
1080        lines.push(format!(
1081            "  {}[\"{}\"]",
1082            sanitize_mermaid_id(&node.id),
1083            escape_mermaid_label(label.as_str()),
1084        ));
1085
1086        if let Some(llm) = node.node_type.llm_call.as_ref() {
1087            for (idx, tool_name) in llm_tool_names(llm).into_iter().enumerate() {
1088                let tool_id = sanitize_mermaid_id(format!("{}__tool_{}", node.id, idx).as_str());
1089                lines.push(format!(
1090                    "  {}([\"{}\"])",
1091                    tool_id,
1092                    escape_mermaid_label(format!("tool: {tool_name}").as_str())
1093                ));
1094                lines.push(format!(
1095                    "  {} -.-> {}",
1096                    sanitize_mermaid_id(&node.id),
1097                    tool_id
1098                ));
1099                tool_node_ids.push(tool_id);
1100            }
1101        }
1102    }
1103
1104    let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1105
1106    for edge in &workflow.edges {
1107        emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1108    }
1109
1110    for node in &workflow.nodes {
1111        if let Some(switch) = node.node_type.switch.as_ref() {
1112            for branch in &switch.branches {
1113                emitted.insert((
1114                    node.id.clone(),
1115                    branch.condition.clone(),
1116                    branch.target.clone(),
1117                ));
1118            }
1119            emitted.insert((
1120                node.id.clone(),
1121                "default".to_string(),
1122                switch.default.clone(),
1123            ));
1124        }
1125    }
1126
1127    let mut edges = emitted.into_iter().collect::<Vec<_>>();
1128    edges.sort();
1129
1130    for (from, label, to) in edges {
1131        if label.is_empty() {
1132            lines.push(format!(
1133                "  {} --> {}",
1134                sanitize_mermaid_id(&from),
1135                sanitize_mermaid_id(&to)
1136            ));
1137        } else {
1138            lines.push(format!(
1139                "  {} -- \"{}\" --> {}",
1140                sanitize_mermaid_id(&from),
1141                escape_mermaid_label(&label),
1142                sanitize_mermaid_id(&to)
1143            ));
1144        }
1145    }
1146
1147    if !tool_node_ids.is_empty() {
1148        lines.push("  classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1149        lines.push(format!("  class {} toolNode;", tool_node_ids.join(",")));
1150    }
1151
1152    lines.join("\n")
1153}
1154
1155fn llm_tool_names(llm: &YamlLlmCall) -> Vec<String> {
1156    llm.tools
1157        .iter()
1158        .map(|tool| match tool {
1159            YamlToolDeclaration::OpenAi(openai) => openai.function.name.clone(),
1160            YamlToolDeclaration::Simplified(simple) => simple.name.clone(),
1161        })
1162        .collect()
1163}
1164
1165/// Load a YAML workflow file and render it as Mermaid flowchart.
1166pub fn yaml_workflow_file_to_mermaid(workflow_path: &Path) -> Result<String, YamlWorkflowRunError> {
1167    let contents =
1168        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1169            path: workflow_path.display().to_string(),
1170            source,
1171        })?;
1172
1173    let workflow: YamlWorkflow =
1174        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1175            path: workflow_path.display().to_string(),
1176            source,
1177        })?;
1178
1179    let referenced_subgraphs = discover_referenced_subgraphs(workflow_path, &workflow)?;
1180    if referenced_subgraphs.is_empty() {
1181        return Ok(yaml_workflow_to_mermaid(&workflow));
1182    }
1183
1184    Ok(yaml_workflow_to_mermaid_with_subgraphs(
1185        &workflow,
1186        &referenced_subgraphs,
1187    ))
1188}
1189
1190#[derive(Debug, Clone)]
1191struct MermaidSubgraphWorkflow {
1192    alias: String,
1193    workflow: YamlWorkflow,
1194}
1195
1196#[derive(Debug, Default)]
1197struct MermaidBlockRender {
1198    lines: Vec<String>,
1199    tool_node_ids: Vec<String>,
1200    run_workflow_tool_node_ids: Vec<String>,
1201    entry_node_id: String,
1202}
1203
1204fn yaml_workflow_to_mermaid_with_subgraphs(
1205    workflow: &YamlWorkflow,
1206    subgraphs: &[MermaidSubgraphWorkflow],
1207) -> String {
1208    let mut lines = vec!["flowchart TD".to_string()];
1209
1210    let main_block = render_mermaid_block(workflow, "main");
1211    lines.push(format!(
1212        "  subgraph main_graph[\"Main: {}\"]",
1213        escape_mermaid_label(&workflow.id)
1214    ));
1215    for line in &main_block.lines {
1216        lines.push(format!("    {line}"));
1217    }
1218    lines.push("  end".to_string());
1219
1220    let mut all_tool_nodes = main_block.tool_node_ids.clone();
1221
1222    for (index, subgraph) in subgraphs.iter().enumerate() {
1223        let block_id = format!("subgraph_{}", index + 1);
1224        let block = render_mermaid_block(&subgraph.workflow, &block_id);
1225        lines.push(format!(
1226            "  subgraph {}[\"Subgraph: {}\"]",
1227            sanitize_mermaid_id(&format!("{}_cluster", block_id)),
1228            escape_mermaid_label(&subgraph.alias)
1229        ));
1230        for line in &block.lines {
1231            lines.push(format!("    {line}"));
1232        }
1233        lines.push("  end".to_string());
1234
1235        for tool_node in &main_block.run_workflow_tool_node_ids {
1236            lines.push(format!(
1237                "  {} -. \"{}\" .-> {}",
1238                tool_node,
1239                escape_mermaid_label(&format!("calls {}", subgraph.alias)),
1240                block.entry_node_id
1241            ));
1242        }
1243
1244        all_tool_nodes.extend(block.tool_node_ids);
1245    }
1246
1247    if !all_tool_nodes.is_empty() {
1248        lines.push("  classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1249        lines.push(format!("  class {} toolNode;", all_tool_nodes.join(",")));
1250    }
1251
1252    lines.join("\n")
1253}
1254
1255fn render_mermaid_block(workflow: &YamlWorkflow, prefix: &str) -> MermaidBlockRender {
1256    let mut block = MermaidBlockRender {
1257        entry_node_id: prefixed_mermaid_id(prefix, &workflow.entry_node),
1258        ..Default::default()
1259    };
1260
1261    for node in &workflow.nodes {
1262        let node_id = prefixed_mermaid_id(prefix, &node.id);
1263        let label = format!("{}\\n({})", node.id, node.kind_name());
1264        block
1265            .lines
1266            .push(format!("{}[\"{}\"]", node_id, escape_mermaid_label(&label)));
1267
1268        if let Some(llm) = node.node_type.llm_call.as_ref() {
1269            for (idx, tool) in llm.tools.iter().enumerate() {
1270                let tool_name = tool_declaration_name(tool);
1271                let tool_id = prefixed_mermaid_id(prefix, &format!("{}__tool_{}", node.id, idx));
1272                block.lines.push(format!(
1273                    "{}([\"{}\"])",
1274                    tool_id,
1275                    escape_mermaid_label(format!("tool: {tool_name}").as_str())
1276                ));
1277                block.lines.push(format!("{} -.-> {}", node_id, tool_id));
1278                block.tool_node_ids.push(tool_id.clone());
1279
1280                if tool_name == "run_workflow_graph" {
1281                    block.run_workflow_tool_node_ids.push(tool_id);
1282                }
1283            }
1284        }
1285    }
1286
1287    let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1288
1289    for edge in &workflow.edges {
1290        emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1291    }
1292
1293    for node in &workflow.nodes {
1294        if let Some(switch) = node.node_type.switch.as_ref() {
1295            for branch in &switch.branches {
1296                emitted.insert((
1297                    node.id.clone(),
1298                    branch.condition.clone(),
1299                    branch.target.clone(),
1300                ));
1301            }
1302            emitted.insert((
1303                node.id.clone(),
1304                "default".to_string(),
1305                switch.default.clone(),
1306            ));
1307        }
1308    }
1309
1310    let mut edges = emitted.into_iter().collect::<Vec<_>>();
1311    edges.sort();
1312
1313    for (from, label, to) in edges {
1314        let from_id = prefixed_mermaid_id(prefix, &from);
1315        let to_id = prefixed_mermaid_id(prefix, &to);
1316        if label.is_empty() {
1317            block.lines.push(format!("{} --> {}", from_id, to_id));
1318        } else {
1319            block.lines.push(format!(
1320                "{} -- \"{}\" --> {}",
1321                from_id,
1322                escape_mermaid_label(&label),
1323                to_id
1324            ));
1325        }
1326    }
1327
1328    block
1329}
1330
1331fn discover_referenced_subgraphs(
1332    workflow_path: &Path,
1333    workflow: &YamlWorkflow,
1334) -> Result<Vec<MermaidSubgraphWorkflow>, YamlWorkflowRunError> {
1335    let workflow_ids = referenced_workflow_ids(workflow);
1336    if workflow_ids.is_empty() {
1337        return Ok(Vec::new());
1338    }
1339
1340    let parent_dir = workflow_path.parent().unwrap_or(Path::new("."));
1341    let sibling_workflows = load_yaml_sibling_workflows(parent_dir, workflow_path)?;
1342
1343    let mut discovered = Vec::new();
1344    let mut seen = HashSet::new();
1345
1346    for workflow_id in workflow_ids {
1347        let normalized = normalize_workflow_lookup_key(&workflow_id);
1348        if seen.contains(&normalized) {
1349            continue;
1350        }
1351
1352        if let Some((_, subworkflow)) = sibling_workflows.iter().find(|(key, _)| key == &normalized)
1353        {
1354            discovered.push(MermaidSubgraphWorkflow {
1355                alias: workflow_id.clone(),
1356                workflow: subworkflow.clone(),
1357            });
1358            seen.insert(normalized);
1359        }
1360    }
1361
1362    Ok(discovered)
1363}
1364
1365fn load_yaml_sibling_workflows(
1366    parent_dir: &Path,
1367    workflow_path: &Path,
1368) -> Result<Vec<(String, YamlWorkflow)>, YamlWorkflowRunError> {
1369    let mut results = Vec::new();
1370    let entries = std::fs::read_dir(parent_dir).map_err(|source| YamlWorkflowRunError::Read {
1371        path: parent_dir.display().to_string(),
1372        source,
1373    })?;
1374
1375    for entry in entries {
1376        let entry = entry.map_err(|source| YamlWorkflowRunError::Read {
1377            path: parent_dir.display().to_string(),
1378            source,
1379        })?;
1380        let path = entry.path();
1381        if !is_yaml_file(&path) {
1382            continue;
1383        }
1384
1385        if path == workflow_path {
1386            continue;
1387        }
1388
1389        let contents =
1390            std::fs::read_to_string(&path).map_err(|source| YamlWorkflowRunError::Read {
1391                path: path.display().to_string(),
1392                source,
1393            })?;
1394        let subworkflow: YamlWorkflow =
1395            serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1396                path: path.display().to_string(),
1397                source,
1398            })?;
1399
1400        if let Some(stem) = path.file_stem().and_then(|value| value.to_str()) {
1401            results.push((normalize_workflow_lookup_key(stem), subworkflow.clone()));
1402        }
1403        results.push((normalize_workflow_lookup_key(&subworkflow.id), subworkflow));
1404    }
1405
1406    Ok(results)
1407}
1408
1409fn referenced_workflow_ids(workflow: &YamlWorkflow) -> Vec<String> {
1410    let mut ids = Vec::new();
1411    let mut seen = HashSet::new();
1412
1413    for node in &workflow.nodes {
1414        let prompt = node
1415            .config
1416            .as_ref()
1417            .and_then(|config| config.prompt.as_deref());
1418
1419        if let Some(llm) = node.node_type.llm_call.as_ref() {
1420            for tool in &llm.tools {
1421                if tool_declaration_name(tool) != "run_workflow_graph" {
1422                    continue;
1423                }
1424
1425                for workflow_id in referenced_workflow_ids_from_tool(tool) {
1426                    let normalized = normalize_workflow_lookup_key(&workflow_id);
1427                    if seen.insert(normalized) {
1428                        ids.push(workflow_id);
1429                    }
1430                }
1431
1432                if let Some(prompt_text) = prompt {
1433                    for workflow_id in referenced_workflow_ids_from_prompt(prompt_text) {
1434                        let normalized = normalize_workflow_lookup_key(&workflow_id);
1435                        if seen.insert(normalized) {
1436                            ids.push(workflow_id);
1437                        }
1438                    }
1439                }
1440            }
1441        }
1442    }
1443
1444    ids
1445}
1446
1447fn referenced_workflow_ids_from_tool(tool: &YamlToolDeclaration) -> Vec<String> {
1448    let schema = match tool {
1449        YamlToolDeclaration::OpenAi(openai) => openai.function.parameters.as_ref(),
1450        YamlToolDeclaration::Simplified(simple) => Some(&simple.input_schema),
1451    };
1452
1453    let mut ids = Vec::new();
1454    if let Some(schema) = schema {
1455        if let Some(workflow_prop) = schema
1456            .get("properties")
1457            .and_then(Value::as_object)
1458            .and_then(|properties| properties.get("workflow_id"))
1459        {
1460            if let Some(value) = workflow_prop.get("const").and_then(Value::as_str) {
1461                ids.push(value.to_string());
1462            }
1463
1464            if let Some(enum_values) = workflow_prop.get("enum").and_then(Value::as_array) {
1465                for value in enum_values {
1466                    if let Some(value) = value.as_str() {
1467                        ids.push(value.to_string());
1468                    }
1469                }
1470            }
1471        }
1472    }
1473
1474    ids
1475}
1476
1477fn referenced_workflow_ids_from_prompt(prompt: &str) -> Vec<String> {
1478    let mut ids = Vec::new();
1479    let mut search = prompt;
1480
1481    while let Some(index) = search.find("\"workflow_id\"") {
1482        let remainder = &search[index + "\"workflow_id\"".len()..];
1483        let Some(colon_index) = remainder.find(':') else {
1484            break;
1485        };
1486
1487        let candidate = remainder[colon_index + 1..].trim_start();
1488        if let Some(rest) = candidate.strip_prefix('"') {
1489            if let Some(end_quote_index) = rest.find('"') {
1490                let workflow_id = rest[..end_quote_index].trim();
1491                if !workflow_id.is_empty() {
1492                    ids.push(workflow_id.to_string());
1493                }
1494                search = &rest[end_quote_index + 1..];
1495                continue;
1496            }
1497        }
1498
1499        search = &remainder[colon_index + 1..];
1500    }
1501
1502    ids
1503}
1504
1505fn normalize_workflow_lookup_key(value: &str) -> String {
1506    value
1507        .chars()
1508        .map(|ch| match ch {
1509            '-' => '_',
1510            _ => ch.to_ascii_lowercase(),
1511        })
1512        .collect()
1513}
1514
1515fn is_yaml_file(path: &Path) -> bool {
1516    matches!(
1517        path.extension().and_then(|ext| ext.to_str()),
1518        Some("yaml") | Some("yml")
1519    )
1520}
1521
1522fn prefixed_mermaid_id(prefix: &str, id: &str) -> String {
1523    sanitize_mermaid_id(format!("{}__{}", prefix, id).as_str())
1524}
1525
1526fn tool_declaration_name(tool: &YamlToolDeclaration) -> &str {
1527    match tool {
1528        YamlToolDeclaration::OpenAi(openai) => &openai.function.name,
1529        YamlToolDeclaration::Simplified(simple) => &simple.name,
1530    }
1531}
1532
1533pub fn yaml_workflow_to_ir(workflow: &YamlWorkflow) -> Result<WorkflowDefinition, YamlToIrError> {
1534    let known_ids: HashSet<&str> = workflow.nodes.iter().map(|n| n.id.as_str()).collect();
1535    if !known_ids.contains(workflow.entry_node.as_str()) {
1536        return Err(YamlToIrError::MissingEntry {
1537            entry_node: workflow.entry_node.clone(),
1538        });
1539    }
1540
1541    let mut outgoing: HashMap<&str, Vec<&str>> = HashMap::new();
1542    for edge in &workflow.edges {
1543        outgoing
1544            .entry(edge.from.as_str())
1545            .or_default()
1546            .push(edge.to.as_str());
1547    }
1548
1549    let mut nodes = Vec::with_capacity(workflow.nodes.len() + 1);
1550    nodes.push(Node {
1551        id: YAML_START_NODE_ID.to_string(),
1552        kind: NodeKind::Start {
1553            next: workflow.entry_node.clone(),
1554        },
1555    });
1556
1557    for node in &workflow.nodes {
1558        if let Some(llm) = node.node_type.llm_call.as_ref() {
1559            if node
1560                .config
1561                .as_ref()
1562                .and_then(|c| c.set_globals.as_ref())
1563                .is_some()
1564                || node
1565                    .config
1566                    .as_ref()
1567                    .and_then(|c| c.update_globals.as_ref())
1568                    .is_some()
1569            {
1570                return Err(YamlToIrError::UnsupportedNode {
1571                    node_id: node.id.clone(),
1572                    reason: "set_globals/update_globals are not represented in canonical IR llm nodes yet"
1573                        .to_string(),
1574                });
1575            }
1576
1577            if !llm.tools.is_empty() {
1578                return Err(YamlToIrError::UnsupportedNode {
1579                    node_id: node.id.clone(),
1580                    reason: "llm_call.tools are not represented in canonical IR llm nodes yet"
1581                        .to_string(),
1582                });
1583            }
1584
1585            let next = single_next_for_node(&outgoing, &node.id)?;
1586            nodes.push(Node {
1587                id: node.id.clone(),
1588                kind: NodeKind::Tool {
1589                    tool: YAML_LLM_TOOL_ID.to_string(),
1590                    input: json!({
1591                        "node_id": node.id,
1592                        "model": llm.model,
1593                        "prompt_template": node
1594                            .config
1595                            .as_ref()
1596                            .and_then(|c| c.prompt.clone())
1597                            .unwrap_or_default(),
1598                        "stream": llm.stream.unwrap_or(false),
1599                        "stream_json_as_text": llm.stream_json_as_text.unwrap_or(false),
1600                        "heal": llm.heal.unwrap_or(false),
1601                        "messages_path": llm.messages_path,
1602                        "append_prompt_as_user": llm.append_prompt_as_user.unwrap_or(true),
1603                        "output_schema": node
1604                            .config
1605                            .as_ref()
1606                            .and_then(|c| c.output_schema.clone())
1607                            .unwrap_or_else(default_llm_output_schema),
1608                    }),
1609                    next,
1610                },
1611            });
1612            continue;
1613        }
1614
1615        if let Some(worker) = node.node_type.custom_worker.as_ref() {
1616            if node
1617                .config
1618                .as_ref()
1619                .and_then(|c| c.set_globals.as_ref())
1620                .is_some()
1621                || node
1622                    .config
1623                    .as_ref()
1624                    .and_then(|c| c.update_globals.as_ref())
1625                    .is_some()
1626            {
1627                return Err(YamlToIrError::UnsupportedNode {
1628                    node_id: node.id.clone(),
1629                    reason: "set_globals/update_globals are not represented in canonical IR tool nodes yet"
1630                        .to_string(),
1631                });
1632            }
1633
1634            let next = single_next_for_node(&outgoing, &node.id)?;
1635            nodes.push(Node {
1636                id: node.id.clone(),
1637                kind: NodeKind::Tool {
1638                    tool: worker.handler.clone(),
1639                    input: node
1640                        .config
1641                        .as_ref()
1642                        .and_then(|c| c.payload.clone())
1643                        .unwrap_or_else(|| json!({})),
1644                    next,
1645                },
1646            });
1647            continue;
1648        }
1649
1650        if let Some(switch) = node.node_type.switch.as_ref() {
1651            nodes.push(Node {
1652                id: node.id.clone(),
1653                kind: NodeKind::Router {
1654                    routes: switch
1655                        .branches
1656                        .iter()
1657                        .map(|b| RouterRoute {
1658                            when: rewrite_yaml_condition_to_ir(&b.condition),
1659                            next: b.target.clone(),
1660                        })
1661                        .collect(),
1662                    default: switch.default.clone(),
1663                },
1664            });
1665            continue;
1666        }
1667
1668        return Err(YamlToIrError::UnsupportedNode {
1669            node_id: node.id.clone(),
1670            reason: "node_type must be llm_call, switch, or custom_worker".to_string(),
1671        });
1672    }
1673
1674    Ok(WorkflowDefinition {
1675        version: WORKFLOW_IR_V0.to_string(),
1676        name: workflow.id.clone(),
1677        nodes,
1678    })
1679}
1680
1681fn single_next_for_node(
1682    outgoing: &HashMap<&str, Vec<&str>>,
1683    node_id: &str,
1684) -> Result<Option<String>, YamlToIrError> {
1685    match outgoing.get(node_id) {
1686        None => Ok(None),
1687        Some(targets) if targets.len() == 1 => Ok(Some(targets[0].to_string())),
1688        Some(_) => Err(YamlToIrError::MultipleOutgoingEdge {
1689            node_id: node_id.to_string(),
1690        }),
1691    }
1692}
1693
1694fn rewrite_yaml_condition_to_ir(expr: &str) -> String {
1695    let rewritten = expr
1696        .replace("$.nodes.", "$.node_outputs.")
1697        .replace(".output.", ".");
1698    if let Some(prefix) = rewritten.strip_suffix(".output") {
1699        prefix.to_string()
1700    } else {
1701        rewritten
1702    }
1703}
1704
1705fn sanitize_mermaid_id(id: &str) -> String {
1706    let mut out = String::with_capacity(id.len() + 1);
1707    if id
1708        .chars()
1709        .next()
1710        .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_')
1711    {
1712        out.push_str(id);
1713    } else {
1714        out.push('n');
1715        out.push('_');
1716        out.push_str(id);
1717    }
1718    out.chars()
1719        .map(|ch| {
1720            if ch.is_ascii_alphanumeric() || ch == '_' {
1721                ch
1722            } else {
1723                '_'
1724            }
1725        })
1726        .collect()
1727}
1728
1729fn escape_mermaid_label(label: &str) -> String {
1730    label.replace('"', "\\\"")
1731}
1732
1733#[derive(Debug, Clone)]
1734pub struct YamlLlmExecutionRequest {
1735    pub node_id: String,
1736    pub is_terminal_node: bool,
1737    pub stream_json_as_text: bool,
1738    pub model: String,
1739    pub messages: Option<Vec<Message>>,
1740    pub append_prompt_as_user: bool,
1741    pub prompt: String,
1742    pub prompt_template: String,
1743    pub prompt_bindings: Vec<YamlTemplateBinding>,
1744    pub schema: Value,
1745    pub stream: bool,
1746    pub heal: bool,
1747    pub tools: Vec<YamlResolvedTool>,
1748    pub tool_choice: Option<ToolChoice>,
1749    pub max_tool_roundtrips: u8,
1750    pub tool_calls_global_key: Option<String>,
1751    pub tool_trace_mode: YamlToolTraceMode,
1752    pub execution_context: Value,
1753    pub email_text: String,
1754    pub trace_id: Option<String>,
1755    pub trace_context: Option<TraceContext>,
1756    pub trace_sampled: bool,
1757}
1758
1759#[derive(Debug, Clone)]
1760pub struct YamlResolvedTool {
1761    pub definition: ToolDefinition,
1762    pub output_schema: Option<Value>,
1763}
1764
1765#[async_trait]
1766pub trait YamlWorkflowLlmExecutor: Send + Sync {
1767    async fn complete_structured(
1768        &self,
1769        request: YamlLlmExecutionRequest,
1770        event_sink: Option<&dyn YamlWorkflowEventSink>,
1771    ) -> Result<YamlLlmExecutionResult, String>;
1772}
1773
1774#[async_trait]
1775pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
1776    async fn execute(
1777        &self,
1778        handler: &str,
1779        payload: &Value,
1780        email_text: &str,
1781        context: &Value,
1782    ) -> Result<Value, String>;
1783}
1784
1785pub async fn run_workflow_yaml_file(
1786    workflow_path: &Path,
1787    workflow_input: &Value,
1788    executor: &dyn YamlWorkflowLlmExecutor,
1789) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1790    let contents =
1791        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1792            path: workflow_path.display().to_string(),
1793            source,
1794        })?;
1795
1796    let workflow: YamlWorkflow =
1797        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1798            path: workflow_path.display().to_string(),
1799            source,
1800        })?;
1801
1802    run_workflow_yaml(&workflow, workflow_input, executor).await
1803}
1804
1805pub async fn run_email_workflow_yaml_file(
1806    workflow_path: &Path,
1807    email_text: &str,
1808    executor: &dyn YamlWorkflowLlmExecutor,
1809) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1810    let workflow_input = json!({ "email_text": email_text });
1811    run_workflow_yaml_file(workflow_path, &workflow_input, executor).await
1812}
1813
1814pub async fn run_workflow_yaml_file_with_client(
1815    workflow_path: &Path,
1816    workflow_input: &Value,
1817    client: &SimpleAgentsClient,
1818) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1819    let contents =
1820        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1821            path: workflow_path.display().to_string(),
1822            source,
1823        })?;
1824
1825    let workflow: YamlWorkflow =
1826        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1827            path: workflow_path.display().to_string(),
1828            source,
1829        })?;
1830
1831    run_workflow_yaml_with_client(&workflow, workflow_input, client).await
1832}
1833
1834pub async fn run_email_workflow_yaml_file_with_client(
1835    workflow_path: &Path,
1836    email_text: &str,
1837    client: &SimpleAgentsClient,
1838) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1839    let workflow_input = json!({ "email_text": email_text });
1840    run_workflow_yaml_file_with_client(workflow_path, &workflow_input, client).await
1841}
1842
1843pub async fn run_workflow_yaml_with_client(
1844    workflow: &YamlWorkflow,
1845    workflow_input: &Value,
1846    client: &SimpleAgentsClient,
1847) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1848    run_workflow_yaml_with_client_and_custom_worker(workflow, workflow_input, client, None).await
1849}
1850
1851pub async fn run_email_workflow_yaml_with_client(
1852    workflow: &YamlWorkflow,
1853    email_text: &str,
1854    client: &SimpleAgentsClient,
1855) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1856    let workflow_input = json!({ "email_text": email_text });
1857    run_workflow_yaml_with_client(workflow, &workflow_input, client).await
1858}
1859
1860pub async fn run_workflow_yaml_file_with_client_and_custom_worker(
1861    workflow_path: &Path,
1862    workflow_input: &Value,
1863    client: &SimpleAgentsClient,
1864    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1865) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1866    let contents =
1867        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1868            path: workflow_path.display().to_string(),
1869            source,
1870        })?;
1871
1872    let workflow: YamlWorkflow =
1873        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1874            path: workflow_path.display().to_string(),
1875            source,
1876        })?;
1877
1878    run_workflow_yaml_with_client_and_custom_worker(
1879        &workflow,
1880        workflow_input,
1881        client,
1882        custom_worker,
1883    )
1884    .await
1885}
1886
1887pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker(
1888    workflow_path: &Path,
1889    email_text: &str,
1890    client: &SimpleAgentsClient,
1891    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1892) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1893    let workflow_input = json!({ "email_text": email_text });
1894    run_workflow_yaml_file_with_client_and_custom_worker(
1895        workflow_path,
1896        &workflow_input,
1897        client,
1898        custom_worker,
1899    )
1900    .await
1901}
1902
1903pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events(
1904    workflow_path: &Path,
1905    workflow_input: &Value,
1906    client: &SimpleAgentsClient,
1907    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1908    event_sink: Option<&dyn YamlWorkflowEventSink>,
1909) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1910    run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
1911        workflow_path,
1912        workflow_input,
1913        client,
1914        custom_worker,
1915        event_sink,
1916        &YamlWorkflowRunOptions::default(),
1917    )
1918    .await
1919}
1920
1921pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
1922    workflow_path: &Path,
1923    workflow_input: &Value,
1924    client: &SimpleAgentsClient,
1925    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1926    event_sink: Option<&dyn YamlWorkflowEventSink>,
1927    options: &YamlWorkflowRunOptions,
1928) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1929    let contents =
1930        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1931            path: workflow_path.display().to_string(),
1932            source,
1933        })?;
1934
1935    let workflow: YamlWorkflow =
1936        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1937            path: workflow_path.display().to_string(),
1938            source,
1939        })?;
1940
1941    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1942        &workflow,
1943        workflow_input,
1944        client,
1945        custom_worker,
1946        event_sink,
1947        options,
1948    )
1949    .await
1950}
1951
1952pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker_and_events(
1953    workflow_path: &Path,
1954    email_text: &str,
1955    client: &SimpleAgentsClient,
1956    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1957    event_sink: Option<&dyn YamlWorkflowEventSink>,
1958) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1959    let workflow_input = json!({ "email_text": email_text });
1960    run_workflow_yaml_file_with_client_and_custom_worker_and_events(
1961        workflow_path,
1962        &workflow_input,
1963        client,
1964        custom_worker,
1965        event_sink,
1966    )
1967    .await
1968}
1969
1970pub async fn run_workflow_yaml_with_client_and_custom_worker(
1971    workflow: &YamlWorkflow,
1972    workflow_input: &Value,
1973    client: &SimpleAgentsClient,
1974    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1975) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1976    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1977        workflow,
1978        workflow_input,
1979        client,
1980        custom_worker,
1981        None,
1982        &YamlWorkflowRunOptions::default(),
1983    )
1984    .await
1985}
1986
1987pub async fn run_email_workflow_yaml_with_client_and_custom_worker(
1988    workflow: &YamlWorkflow,
1989    email_text: &str,
1990    client: &SimpleAgentsClient,
1991    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1992) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1993    let workflow_input = json!({ "email_text": email_text });
1994    run_workflow_yaml_with_client_and_custom_worker(
1995        workflow,
1996        &workflow_input,
1997        client,
1998        custom_worker,
1999    )
2000    .await
2001}
2002
2003pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events(
2004    workflow: &YamlWorkflow,
2005    workflow_input: &Value,
2006    client: &SimpleAgentsClient,
2007    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2008    event_sink: Option<&dyn YamlWorkflowEventSink>,
2009) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2010    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2011        workflow,
2012        workflow_input,
2013        client,
2014        custom_worker,
2015        event_sink,
2016        &YamlWorkflowRunOptions::default(),
2017    )
2018    .await
2019}
2020
2021pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2022    workflow: &YamlWorkflow,
2023    workflow_input: &Value,
2024    client: &SimpleAgentsClient,
2025    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2026    event_sink: Option<&dyn YamlWorkflowEventSink>,
2027    options: &YamlWorkflowRunOptions,
2028) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2029    struct BorrowedClientExecutor<'a> {
2030        client: &'a SimpleAgentsClient,
2031        custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
2032        run_options: YamlWorkflowRunOptions,
2033    }
2034
2035    #[async_trait]
2036    impl<'a> YamlWorkflowLlmExecutor for BorrowedClientExecutor<'a> {
2037        async fn complete_structured(
2038            &self,
2039            request: YamlLlmExecutionRequest,
2040            event_sink: Option<&dyn YamlWorkflowEventSink>,
2041        ) -> Result<YamlLlmExecutionResult, String> {
2042            let expects_object = schema_expects_object(&request.schema);
2043            let messages = if let Some(mut history) = request.messages.clone() {
2044                if request.append_prompt_as_user && !request.prompt.trim().is_empty() {
2045                    history.push(Message::user(&request.prompt));
2046                }
2047                history
2048            } else {
2049                vec![
2050                    Message::system("You execute workflow classification steps."),
2051                    Message::user(&request.prompt),
2052                ]
2053            };
2054
2055            if !request.tools.is_empty() {
2056                let mut tool_traces: Vec<YamlToolCallTrace> = Vec::new();
2057                let mut conversation = messages;
2058                let mut usage_total: Option<YamlLlmTokenUsage> = None;
2059
2060                for roundtrip in 0..=request.max_tool_roundtrips {
2061                    let mut builder = CompletionRequest::builder()
2062                        .model(&request.model)
2063                        .messages(conversation.clone())
2064                        .tools(request.tools.iter().map(|t| t.definition.clone()).collect());
2065
2066                    if request.heal && expects_object {
2067                        builder = builder.json_schema("workflow_step", request.schema.clone());
2068                    }
2069
2070                    if let Some(choice) = request.tool_choice.clone() {
2071                        builder = builder.tool_choice(choice);
2072                    }
2073
2074                    if request.stream {
2075                        builder = builder.stream(true);
2076                    }
2077
2078                    let completion_request = builder
2079                        .build()
2080                        .map_err(|error| format!("failed to build completion request: {error}"))?;
2081
2082                    let outcome = self
2083                        .client
2084                        .complete(&completion_request, CompletionOptions::default())
2085                        .await
2086                        .map_err(|error| error.to_string())?;
2087
2088                    let mut streamed_tool_calls: Option<Vec<ToolCall>> = None;
2089                    let mut streamed_content = String::new();
2090                    let mut finish_reason = FinishReason::Stop;
2091
2092                    match outcome {
2093                        CompletionOutcome::Response(response) => {
2094                            if let Some(usage) = usage_total.as_mut() {
2095                                usage.prompt_tokens += response.usage.prompt_tokens;
2096                                usage.completion_tokens += response.usage.completion_tokens;
2097                                usage.total_tokens += response.usage.total_tokens;
2098                            } else {
2099                                usage_total = Some(YamlLlmTokenUsage {
2100                                    prompt_tokens: response.usage.prompt_tokens,
2101                                    completion_tokens: response.usage.completion_tokens,
2102                                    total_tokens: response.usage.total_tokens,
2103                                    thinking_tokens: None,
2104                                });
2105                            }
2106
2107                            let choice = response
2108                                .choices
2109                                .first()
2110                                .ok_or_else(|| "completion returned no choices".to_string())?;
2111                            streamed_content = choice.message.content.clone();
2112                            streamed_tool_calls = choice.message.tool_calls.clone();
2113                            finish_reason = choice.finish_reason;
2114                        }
2115                        CompletionOutcome::HealedJson(healed) => {
2116                            let response = healed.response;
2117                            if let Some(usage) = usage_total.as_mut() {
2118                                usage.prompt_tokens += response.usage.prompt_tokens;
2119                                usage.completion_tokens += response.usage.completion_tokens;
2120                                usage.total_tokens += response.usage.total_tokens;
2121                            } else {
2122                                usage_total = Some(YamlLlmTokenUsage {
2123                                    prompt_tokens: response.usage.prompt_tokens,
2124                                    completion_tokens: response.usage.completion_tokens,
2125                                    total_tokens: response.usage.total_tokens,
2126                                    thinking_tokens: None,
2127                                });
2128                            }
2129
2130                            let choice = response
2131                                .choices
2132                                .first()
2133                                .ok_or_else(|| "completion returned no choices".to_string())?;
2134                            streamed_content = choice.message.content.clone();
2135                            streamed_tool_calls = choice.message.tool_calls.clone();
2136                            finish_reason = choice.finish_reason;
2137                        }
2138                        CompletionOutcome::CoercedSchema(coerced) => {
2139                            let response = coerced.response;
2140                            if let Some(usage) = usage_total.as_mut() {
2141                                usage.prompt_tokens += response.usage.prompt_tokens;
2142                                usage.completion_tokens += response.usage.completion_tokens;
2143                                usage.total_tokens += response.usage.total_tokens;
2144                            } else {
2145                                usage_total = Some(YamlLlmTokenUsage {
2146                                    prompt_tokens: response.usage.prompt_tokens,
2147                                    completion_tokens: response.usage.completion_tokens,
2148                                    total_tokens: response.usage.total_tokens,
2149                                    thinking_tokens: None,
2150                                });
2151                            }
2152
2153                            let choice = response
2154                                .choices
2155                                .first()
2156                                .ok_or_else(|| "completion returned no choices".to_string())?;
2157                            streamed_content = choice.message.content.clone();
2158                            streamed_tool_calls = choice.message.tool_calls.clone();
2159                            finish_reason = choice.finish_reason;
2160                        }
2161                        CompletionOutcome::Stream(mut stream) => {
2162                            let mut final_stream_usage: Option<simple_agent_type::response::Usage> =
2163                                None;
2164                            let mut delta_filter = StructuredJsonDeltaFilter::default();
2165                            let include_raw_debug = include_raw_stream_debug_events();
2166                            let mut json_text_formatter = if request.stream_json_as_text {
2167                                Some(StreamJsonAsTextFormatter::default())
2168                            } else {
2169                                None
2170                            };
2171                            let mut tool_calls_by_index: HashMap<u32, ToolCall> = HashMap::new();
2172
2173                            while let Some(chunk_result) = stream.next().await {
2174                                if event_sink_is_cancelled(event_sink) {
2175                                    return Err(workflow_event_sink_cancelled_message().to_string());
2176                                }
2177
2178                                let chunk = chunk_result.map_err(|error| error.to_string())?;
2179                                if let Some(usage) = chunk.usage {
2180                                    final_stream_usage = Some(usage);
2181                                }
2182
2183                                if let Some(choice) = chunk.choices.first() {
2184                                    if let Some(chunk_finish_reason) = choice.finish_reason {
2185                                        finish_reason = chunk_finish_reason;
2186                                    }
2187
2188                                    if include_raw_debug {
2189                                        if let Some(reasoning_delta) =
2190                                            choice.delta.reasoning_content.as_ref()
2191                                        {
2192                                            if let Some(sink) = event_sink {
2193                                                sink.emit(&YamlWorkflowEvent {
2194                                                    event_type: "node_stream_thinking_delta"
2195                                                        .to_string(),
2196                                                    node_id: Some(request.node_id.clone()),
2197                                                    step_id: Some(request.node_id.clone()),
2198                                                    node_kind: Some("llm_call".to_string()),
2199                                                    streamable: Some(true),
2200                                                    message: None,
2201                                                    delta: Some(reasoning_delta.clone()),
2202                                                    token_kind: Some(
2203                                                        YamlWorkflowTokenKind::Thinking,
2204                                                    ),
2205                                                    is_terminal_node_token: Some(
2206                                                        request.is_terminal_node,
2207                                                    ),
2208                                                    elapsed_ms: None,
2209                                                    metadata: None,
2210                                                });
2211                                            }
2212                                        }
2213                                    }
2214
2215                                    if let Some(delta) = choice.delta.content.clone() {
2216                                        streamed_content.push_str(delta.as_str());
2217                                        let (output_delta, thinking_delta) = if expects_object {
2218                                            delta_filter.split(delta.as_str())
2219                                        } else {
2220                                            (Some(delta.clone()), None)
2221                                        };
2222                                        let rendered_output_delta = if let Some(output_chunk) =
2223                                            output_delta
2224                                        {
2225                                            if let Some(formatter) = json_text_formatter.as_mut() {
2226                                                formatter.push(output_chunk.as_str());
2227                                                formatter.emit_if_ready(delta_filter.completed())
2228                                            } else {
2229                                                Some(output_chunk)
2230                                            }
2231                                        } else {
2232                                            None
2233                                        };
2234
2235                                        if include_raw_debug {
2236                                            if let Some(sink) = event_sink {
2237                                                if let Some(raw_thinking_delta) =
2238                                                    thinking_delta.as_ref()
2239                                                {
2240                                                    sink.emit(&YamlWorkflowEvent {
2241                                                        event_type: "node_stream_thinking_delta"
2242                                                            .to_string(),
2243                                                        node_id: Some(request.node_id.clone()),
2244                                                        step_id: Some(request.node_id.clone()),
2245                                                        node_kind: Some("llm_call".to_string()),
2246                                                        streamable: Some(true),
2247                                                        message: None,
2248                                                        delta: Some(raw_thinking_delta.clone()),
2249                                                        token_kind: Some(
2250                                                            YamlWorkflowTokenKind::Thinking,
2251                                                        ),
2252                                                        is_terminal_node_token: Some(
2253                                                            request.is_terminal_node,
2254                                                        ),
2255                                                        elapsed_ms: None,
2256                                                        metadata: None,
2257                                                    });
2258                                                }
2259                                                if let Some(raw_output_delta) =
2260                                                    rendered_output_delta.as_ref()
2261                                                {
2262                                                    sink.emit(&YamlWorkflowEvent {
2263                                                        event_type: "node_stream_output_delta"
2264                                                            .to_string(),
2265                                                        node_id: Some(request.node_id.clone()),
2266                                                        step_id: Some(request.node_id.clone()),
2267                                                        node_kind: Some("llm_call".to_string()),
2268                                                        streamable: Some(true),
2269                                                        message: None,
2270                                                        delta: Some(raw_output_delta.clone()),
2271                                                        token_kind: Some(
2272                                                            YamlWorkflowTokenKind::Output,
2273                                                        ),
2274                                                        is_terminal_node_token: Some(
2275                                                            request.is_terminal_node,
2276                                                        ),
2277                                                        elapsed_ms: None,
2278                                                        metadata: None,
2279                                                    });
2280                                                }
2281                                            }
2282                                        }
2283
2284                                        if let Some(filtered_delta) = rendered_output_delta {
2285                                            if let Some(sink) = event_sink {
2286                                                sink.emit(&YamlWorkflowEvent {
2287                                                    event_type: "node_stream_delta".to_string(),
2288                                                    node_id: Some(request.node_id.clone()),
2289                                                    step_id: Some(request.node_id.clone()),
2290                                                    node_kind: Some("llm_call".to_string()),
2291                                                    streamable: Some(true),
2292                                                    message: None,
2293                                                    delta: Some(filtered_delta),
2294                                                    token_kind: Some(YamlWorkflowTokenKind::Output),
2295                                                    is_terminal_node_token: Some(
2296                                                        request.is_terminal_node,
2297                                                    ),
2298                                                    elapsed_ms: None,
2299                                                    metadata: None,
2300                                                });
2301                                            }
2302                                        }
2303                                    }
2304
2305                                    if let Some(tool_call_deltas) = choice.delta.tool_calls.as_ref()
2306                                    {
2307                                        for tool_call_delta in tool_call_deltas {
2308                                            let entry = tool_calls_by_index
2309                                                .entry(tool_call_delta.index)
2310                                                .or_insert_with(|| ToolCall {
2311                                                    id: tool_call_delta.id.clone().unwrap_or_else(
2312                                                        || {
2313                                                            format!(
2314                                                                "tool_call_{}",
2315                                                                tool_call_delta.index
2316                                                            )
2317                                                        },
2318                                                    ),
2319                                                    tool_type: ToolType::Function,
2320                                                    function:
2321                                                        simple_agent_type::tool::ToolCallFunction {
2322                                                            name: String::new(),
2323                                                            arguments: String::new(),
2324                                                        },
2325                                                });
2326
2327                                            if let Some(id) = tool_call_delta.id.as_ref() {
2328                                                entry.id = id.clone();
2329                                            }
2330                                            if let Some(tool_type) = tool_call_delta.tool_type {
2331                                                entry.tool_type = tool_type;
2332                                            }
2333                                            if let Some(function_delta) =
2334                                                tool_call_delta.function.as_ref()
2335                                            {
2336                                                if let Some(name) = function_delta.name.as_ref() {
2337                                                    entry.function.name = name.clone();
2338                                                }
2339                                                if let Some(arguments) =
2340                                                    function_delta.arguments.as_ref()
2341                                                {
2342                                                    entry.function.arguments.push_str(arguments);
2343                                                }
2344                                            }
2345                                        }
2346                                    }
2347                                }
2348
2349                                if event_sink_is_cancelled(event_sink) {
2350                                    return Err(workflow_event_sink_cancelled_message().to_string());
2351                                }
2352                            }
2353
2354                            if let Some(usage) = final_stream_usage {
2355                                if let Some(total) = usage_total.as_mut() {
2356                                    total.prompt_tokens += usage.prompt_tokens;
2357                                    total.completion_tokens += usage.completion_tokens;
2358                                    total.total_tokens += usage.total_tokens;
2359                                } else {
2360                                    usage_total = Some(YamlLlmTokenUsage {
2361                                        prompt_tokens: usage.prompt_tokens,
2362                                        completion_tokens: usage.completion_tokens,
2363                                        total_tokens: usage.total_tokens,
2364                                        thinking_tokens: None,
2365                                    });
2366                                }
2367                            }
2368
2369                            let mut ordered_tool_calls =
2370                                tool_calls_by_index.into_iter().collect::<Vec<_>>();
2371                            ordered_tool_calls.sort_by_key(|(index, _)| *index);
2372                            if !ordered_tool_calls.is_empty() {
2373                                streamed_tool_calls = Some(
2374                                    ordered_tool_calls
2375                                        .into_iter()
2376                                        .map(|(_, tool_call)| tool_call)
2377                                        .collect::<Vec<_>>(),
2378                                );
2379                            }
2380                        }
2381                    }
2382
2383                    let has_tool_calls = streamed_tool_calls
2384                        .as_ref()
2385                        .is_some_and(|calls| !calls.is_empty());
2386                    if finish_reason != FinishReason::ToolCalls && !has_tool_calls {
2387                        let payload = if expects_object {
2388                            parse_streamed_structured_payload(
2389                                streamed_content.as_str(),
2390                                request.heal,
2391                            )
2392                            .map_err(|error| {
2393                                format!("failed to parse structured completion JSON: {error}")
2394                            })?
2395                            .payload
2396                        } else {
2397                            Value::String(streamed_content.clone())
2398                        };
2399                        return Ok(YamlLlmExecutionResult {
2400                            payload,
2401                            usage: usage_total,
2402                            ttft_ms: None,
2403                            tool_calls: tool_traces,
2404                        });
2405                    }
2406
2407                    if roundtrip >= request.max_tool_roundtrips {
2408                        return Err(format!(
2409                            "tool call roundtrip limit reached for node '{}' (max={})",
2410                            request.node_id, request.max_tool_roundtrips
2411                        ));
2412                    }
2413
2414                    let tool_calls: Vec<ToolCall> = streamed_tool_calls.ok_or_else(|| {
2415                        "finish_reason=tool_calls but no tool calls found".to_string()
2416                    })?;
2417                    if tool_calls
2418                        .iter()
2419                        .any(|tool_call| tool_call.function.name.trim().is_empty())
2420                    {
2421                        return Err("streamed tool call missing function name".to_string());
2422                    }
2423
2424                    let assistant_tool_message =
2425                        Message::assistant(&streamed_content).with_tool_calls(tool_calls.clone());
2426                    conversation.push(assistant_tool_message);
2427
2428                    for tool_call in tool_calls {
2429                        let tool_call_id = tool_call.id.clone();
2430                        let tool_name = tool_call.function.name.clone();
2431                        let tool_started = Instant::now();
2432                        let arguments: Value = serde_json::from_str(&tool_call.function.arguments)
2433                            .map_err(|error| {
2434                                format!(
2435                                    "tool '{}' arguments must be valid JSON: {}",
2436                                    tool_name, error
2437                                )
2438                            })?;
2439                        let mut tool_span_context: Option<TraceContext> = None;
2440                        let mut tool_span = if request.trace_sampled {
2441                            let (span_context, mut span) = workflow_tracer().start_span(
2442                                "workflow.tool.execute",
2443                                SpanKind::Node,
2444                                request.trace_context.as_ref(),
2445                            );
2446                            tool_span_context = Some(span_context);
2447                            span.set_attribute(
2448                                "trace_id",
2449                                request.trace_id.as_deref().unwrap_or_default(),
2450                            );
2451                            span.set_attribute("node_id", request.node_id.as_str());
2452                            span.set_attribute("node_kind", "llm_call");
2453                            span.set_attribute("tool_name", tool_name.as_str());
2454                            span.set_attribute("tool_call_id", tool_call_id.as_str());
2455                            let args_for_span =
2456                                payload_for_tool_trace(request.tool_trace_mode, &arguments)
2457                                    .to_string();
2458                            span.set_attribute("tool_arguments", args_for_span.as_str());
2459                            Some(span)
2460                        } else {
2461                            None
2462                        };
2463
2464                        if request.tool_trace_mode != YamlToolTraceMode::Off {
2465                            if let Some(sink) = event_sink {
2466                                sink.emit(&YamlWorkflowEvent {
2467                                    event_type: "node_tool_call_requested".to_string(),
2468                                    node_id: Some(request.node_id.clone()),
2469                                    step_id: Some(request.node_id.clone()),
2470                                    node_kind: Some("llm_call".to_string()),
2471                                    streamable: Some(false),
2472                                    message: Some(format!(
2473                                        "tool call requested: {}",
2474                                        tool_name
2475                                    )),
2476                                    delta: None,
2477                                    token_kind: None,
2478                                    is_terminal_node_token: None,
2479                                    elapsed_ms: None,
2480                                    metadata: Some(json!({
2481                                        "tool_call_id": tool_call_id.clone(),
2482                                        "tool_name": tool_name.clone(),
2483                                        "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
2484                                    })),
2485                                });
2486                            }
2487                        }
2488
2489                        let Some(tool_config) = request
2490                            .tools
2491                            .iter()
2492                            .find(|tool| tool.definition.function.name == tool_name)
2493                        else {
2494                            return Err(format!("model requested unknown tool '{}'", tool_name));
2495                        };
2496
2497                        let tool_output_result = if tool_name == "run_workflow_graph" {
2498                            execute_subworkflow_tool_call(
2499                                &arguments,
2500                                &request.execution_context,
2501                                self.client,
2502                                self.custom_worker,
2503                                &self.run_options,
2504                                tool_span_context.as_ref(),
2505                                request.trace_id.as_deref(),
2506                            )
2507                            .await
2508                        } else if let Some(custom_worker) = self.custom_worker {
2509                            custom_worker
2510                                .execute(
2511                                    tool_name.as_str(),
2512                                    &arguments,
2513                                    request.email_text.as_str(),
2514                                    &request.execution_context,
2515                                )
2516                                .await
2517                        } else {
2518                            Err(format!(
2519                                "tool '{}' requested but no custom worker executor is configured",
2520                                tool_name
2521                            ))
2522                        };
2523
2524                        let tool_output = match tool_output_result {
2525                            Ok(output) => output,
2526                            Err(message) => {
2527                                let elapsed_ms = tool_started.elapsed().as_millis();
2528                                if let Some(span) = tool_span.as_mut() {
2529                                    span.add_event("workflow.tool.execute.error");
2530                                    span.set_attribute("tool_status", "error");
2531                                    span.set_attribute("tool_error", message.as_str());
2532                                    span.set_attribute(
2533                                        "elapsed_ms",
2534                                        elapsed_ms.to_string().as_str(),
2535                                    );
2536                                }
2537                                if request.tool_trace_mode != YamlToolTraceMode::Off {
2538                                    if let Some(sink) = event_sink {
2539                                        sink.emit(&YamlWorkflowEvent {
2540                                            event_type: "node_tool_call_failed".to_string(),
2541                                            node_id: Some(request.node_id.clone()),
2542                                            step_id: Some(request.node_id.clone()),
2543                                            node_kind: Some("llm_call".to_string()),
2544                                            streamable: Some(false),
2545                                            message: Some(message.clone()),
2546                                            delta: None,
2547                                            token_kind: None,
2548                                            is_terminal_node_token: None,
2549                                            elapsed_ms: Some(elapsed_ms),
2550                                            metadata: Some(json!({
2551                                                "tool_call_id": tool_call_id.clone(),
2552                                                "tool_name": tool_name.clone(),
2553                                            })),
2554                                        });
2555                                    }
2556                                }
2557                                tool_traces.push(YamlToolCallTrace {
2558                                    id: tool_call_id.clone(),
2559                                    name: tool_name.clone(),
2560                                    arguments,
2561                                    output: None,
2562                                    status: "error".to_string(),
2563                                    elapsed_ms,
2564                                    error: Some(message.clone()),
2565                                });
2566                                if let Some(span) = tool_span.take() {
2567                                    span.end();
2568                                }
2569                                return Err(format!("tool '{}' failed: {}", tool_name, message));
2570                            }
2571                        };
2572
2573                        if let Some(output_schema) = tool_config.output_schema.as_ref() {
2574                            validate_schema_instance(output_schema, &tool_output).map_err(
2575                                |message| {
2576                                    format!(
2577                                        "tool '{}' output failed schema validation: {}",
2578                                        tool_name, message
2579                                    )
2580                                },
2581                            )?;
2582                        }
2583
2584                        let elapsed_ms = tool_started.elapsed().as_millis();
2585                        if let Some(span) = tool_span.as_mut() {
2586                            span.add_event("workflow.tool.execute.completed");
2587                            span.set_attribute("tool_status", "ok");
2588                            span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
2589                            let output_for_span =
2590                                payload_for_tool_trace(request.tool_trace_mode, &tool_output)
2591                                    .to_string();
2592                            span.set_attribute("tool_output", output_for_span.as_str());
2593                        }
2594                        if request.tool_trace_mode != YamlToolTraceMode::Off {
2595                            if let Some(sink) = event_sink {
2596                                sink.emit(&YamlWorkflowEvent {
2597                                    event_type: "node_tool_call_completed".to_string(),
2598                                    node_id: Some(request.node_id.clone()),
2599                                    step_id: Some(request.node_id.clone()),
2600                                    node_kind: Some("llm_call".to_string()),
2601                                    streamable: Some(false),
2602                                    message: Some(format!(
2603                                        "tool call completed: {}",
2604                                        tool_name
2605                                    )),
2606                                    delta: None,
2607                                    token_kind: None,
2608                                    is_terminal_node_token: None,
2609                                    elapsed_ms: Some(elapsed_ms),
2610                                    metadata: Some(json!({
2611                                        "tool_call_id": tool_call_id.clone(),
2612                                        "tool_name": tool_name.clone(),
2613                                        "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
2614                                        "output": payload_for_tool_trace(request.tool_trace_mode, &tool_output),
2615                                    })),
2616                                });
2617                            }
2618                        }
2619
2620                        tool_traces.push(YamlToolCallTrace {
2621                            id: tool_call_id.clone(),
2622                            name: tool_name.clone(),
2623                            arguments: arguments.clone(),
2624                            output: Some(tool_output.clone()),
2625                            status: "ok".to_string(),
2626                            elapsed_ms,
2627                            error: None,
2628                        });
2629
2630                        conversation.push(Message::tool(
2631                            serde_json::to_string(&tool_output).map_err(|error| {
2632                                format!("failed to serialize tool output: {error}")
2633                            })?,
2634                            tool_call_id,
2635                        ));
2636                        if let Some(span) = tool_span.take() {
2637                            span.end();
2638                        }
2639                    }
2640
2641                    if request.tool_trace_mode != YamlToolTraceMode::Off {
2642                        if let Some(sink) = event_sink {
2643                            sink.emit(&YamlWorkflowEvent {
2644                                event_type: "node_tool_roundtrip_completed".to_string(),
2645                                node_id: Some(request.node_id.clone()),
2646                                step_id: Some(request.node_id.clone()),
2647                                node_kind: Some("llm_call".to_string()),
2648                                streamable: Some(false),
2649                                message: Some(format!(
2650                                    "tool roundtrip {} completed",
2651                                    roundtrip + 1
2652                                )),
2653                                delta: None,
2654                                token_kind: None,
2655                                is_terminal_node_token: None,
2656                                elapsed_ms: None,
2657                                metadata: Some(json!({
2658                                    "roundtrip": roundtrip + 1,
2659                                    "max_tool_roundtrips": request.max_tool_roundtrips,
2660                                })),
2661                            });
2662                        }
2663                    }
2664                }
2665
2666                return Err(format!(
2667                    "tool-enabled llm_call '{}' exhausted loop without final payload",
2668                    request.node_id
2669                ));
2670            }
2671
2672            let mut builder = CompletionRequest::builder()
2673                .model(&request.model)
2674                .messages(messages);
2675
2676            if request.heal && !request.stream && expects_object {
2677                builder = builder.json_schema("workflow_step", request.schema.clone());
2678            }
2679
2680            if request.stream {
2681                builder = builder.stream(true);
2682            }
2683
2684            let completion_request = builder
2685                .build()
2686                .map_err(|error| format!("failed to build completion request: {error}"))?;
2687
2688            let completion_options = if request.heal && !request.stream && expects_object {
2689                CompletionOptions {
2690                    mode: CompletionMode::HealedJson,
2691                }
2692            } else {
2693                CompletionOptions::default()
2694            };
2695
2696            let outcome = self
2697                .client
2698                .complete(&completion_request, completion_options)
2699                .await
2700                .map_err(|error| error.to_string())?;
2701
2702            match outcome {
2703                CompletionOutcome::Stream(mut stream) => {
2704                    let mut aggregated = String::new();
2705                    let mut final_stream_usage: Option<simple_agent_type::response::Usage> = None;
2706                    let stream_started = Instant::now();
2707                    let mut ttft_ms: Option<u128> = None;
2708                    let mut delta_filter = StructuredJsonDeltaFilter::default();
2709                    let include_raw_debug = include_raw_stream_debug_events();
2710                    let mut json_text_formatter = if request.stream_json_as_text {
2711                        Some(StreamJsonAsTextFormatter::default())
2712                    } else {
2713                        None
2714                    };
2715                    while let Some(chunk_result) = stream.next().await {
2716                        if event_sink_is_cancelled(event_sink) {
2717                            return Err(workflow_event_sink_cancelled_message().to_string());
2718                        }
2719                        let chunk = chunk_result.map_err(|error| error.to_string())?;
2720                        if let Some(usage) = chunk.usage {
2721                            final_stream_usage = Some(usage);
2722                        }
2723                        if let Some(choice) = chunk.choices.first() {
2724                            if ttft_ms.is_none()
2725                                && (choice
2726                                    .delta
2727                                    .content
2728                                    .as_ref()
2729                                    .is_some_and(|delta| !delta.is_empty())
2730                                    || choice
2731                                        .delta
2732                                        .reasoning_content
2733                                        .as_ref()
2734                                        .is_some_and(|delta| !delta.is_empty()))
2735                            {
2736                                ttft_ms = Some(stream_started.elapsed().as_millis());
2737                            }
2738                            if include_raw_debug {
2739                                if let Some(reasoning_delta) =
2740                                    choice.delta.reasoning_content.as_ref()
2741                                {
2742                                    if let Some(sink) = event_sink {
2743                                        sink.emit(&YamlWorkflowEvent {
2744                                            event_type: "node_stream_thinking_delta".to_string(),
2745                                            node_id: Some(request.node_id.clone()),
2746                                            step_id: Some(request.node_id.clone()),
2747                                            node_kind: Some("llm_call".to_string()),
2748                                            streamable: Some(true),
2749                                            message: None,
2750                                            delta: Some(reasoning_delta.clone()),
2751                                            token_kind: Some(YamlWorkflowTokenKind::Thinking),
2752                                            is_terminal_node_token: Some(request.is_terminal_node),
2753                                            elapsed_ms: None,
2754                                            metadata: None,
2755                                        });
2756                                    }
2757                                }
2758                            }
2759                            if let Some(delta) = choice.delta.content.clone() {
2760                                aggregated.push_str(delta.as_str());
2761                                let (output_delta, thinking_delta) = if expects_object {
2762                                    delta_filter.split(delta.as_str())
2763                                } else {
2764                                    (Some(delta.clone()), None)
2765                                };
2766                                let rendered_output_delta = if let Some(output_chunk) = output_delta
2767                                {
2768                                    if let Some(formatter) = json_text_formatter.as_mut() {
2769                                        formatter.push(output_chunk.as_str());
2770                                        formatter.emit_if_ready(delta_filter.completed())
2771                                    } else {
2772                                        Some(output_chunk)
2773                                    }
2774                                } else {
2775                                    None
2776                                };
2777                                if include_raw_debug {
2778                                    if let Some(sink) = event_sink {
2779                                        if let Some(raw_thinking_delta) = thinking_delta.as_ref() {
2780                                            sink.emit(&YamlWorkflowEvent {
2781                                                event_type: "node_stream_thinking_delta"
2782                                                    .to_string(),
2783                                                node_id: Some(request.node_id.clone()),
2784                                                step_id: Some(request.node_id.clone()),
2785                                                node_kind: Some("llm_call".to_string()),
2786                                                streamable: Some(true),
2787                                                message: None,
2788                                                delta: Some(raw_thinking_delta.clone()),
2789                                                token_kind: Some(YamlWorkflowTokenKind::Thinking),
2790                                                is_terminal_node_token: Some(
2791                                                    request.is_terminal_node,
2792                                                ),
2793                                                elapsed_ms: None,
2794                                                metadata: None,
2795                                            });
2796                                        }
2797                                        if let Some(raw_output_delta) =
2798                                            rendered_output_delta.as_ref()
2799                                        {
2800                                            sink.emit(&YamlWorkflowEvent {
2801                                                event_type: "node_stream_output_delta".to_string(),
2802                                                node_id: Some(request.node_id.clone()),
2803                                                step_id: Some(request.node_id.clone()),
2804                                                node_kind: Some("llm_call".to_string()),
2805                                                streamable: Some(true),
2806                                                message: None,
2807                                                delta: Some(raw_output_delta.clone()),
2808                                                token_kind: Some(YamlWorkflowTokenKind::Output),
2809                                                is_terminal_node_token: Some(
2810                                                    request.is_terminal_node,
2811                                                ),
2812                                                elapsed_ms: None,
2813                                                metadata: None,
2814                                            });
2815                                        }
2816                                    }
2817                                }
2818                                if let Some(filtered_delta) = rendered_output_delta {
2819                                    if let Some(sink) = event_sink {
2820                                        sink.emit(&YamlWorkflowEvent {
2821                                            event_type: "node_stream_delta".to_string(),
2822                                            node_id: Some(request.node_id.clone()),
2823                                            step_id: Some(request.node_id.clone()),
2824                                            node_kind: Some("llm_call".to_string()),
2825                                            streamable: Some(true),
2826                                            message: None,
2827                                            delta: Some(filtered_delta),
2828                                            token_kind: Some(YamlWorkflowTokenKind::Output),
2829                                            is_terminal_node_token: Some(request.is_terminal_node),
2830                                            elapsed_ms: None,
2831                                            metadata: None,
2832                                        });
2833                                    }
2834                                }
2835                            }
2836                        }
2837
2838                        if event_sink_is_cancelled(event_sink) {
2839                            return Err(workflow_event_sink_cancelled_message().to_string());
2840                        }
2841                    }
2842
2843                    let payload = if expects_object {
2844                        let resolved =
2845                            parse_streamed_structured_payload(aggregated.as_str(), request.heal)?;
2846                        if let Some(confidence) = resolved.heal_confidence {
2847                            if let Some(sink) = event_sink {
2848                                sink.emit(&YamlWorkflowEvent {
2849                                    event_type: "node_healed".to_string(),
2850                                    node_id: Some(request.node_id.clone()),
2851                                    step_id: Some(request.node_id.clone()),
2852                                    node_kind: Some("llm_call".to_string()),
2853                                    streamable: Some(true),
2854                                    message: Some(format!(
2855                                        "healed streamed structured response confidence={confidence}"
2856                                    )),
2857                                    delta: None,
2858                                    token_kind: None,
2859                                    is_terminal_node_token: None,
2860                                    elapsed_ms: None,
2861                                    metadata: None,
2862                                });
2863                            }
2864                        }
2865                        resolved.payload
2866                    } else {
2867                        Value::String(aggregated)
2868                    };
2869
2870                    Ok(YamlLlmExecutionResult {
2871                        payload,
2872                        usage: final_stream_usage.map(|usage| YamlLlmTokenUsage {
2873                            prompt_tokens: usage.prompt_tokens,
2874                            completion_tokens: usage.completion_tokens,
2875                            total_tokens: usage.total_tokens,
2876                            thinking_tokens: None,
2877                        }),
2878                        ttft_ms,
2879                        tool_calls: Vec::new(),
2880                    })
2881                }
2882                CompletionOutcome::Response(response) => {
2883                    let payload = if expects_object {
2884                        let content = response
2885                            .content()
2886                            .ok_or_else(|| "completion returned empty content".to_string())?;
2887                        serde_json::from_str(content).map_err(|error| {
2888                            format!("failed to parse structured completion JSON: {error}")
2889                        })?
2890                    } else {
2891                        Value::String(response.content().unwrap_or_default().to_string())
2892                    };
2893
2894                    Ok(YamlLlmExecutionResult {
2895                        payload,
2896                        usage: Some(YamlLlmTokenUsage {
2897                            prompt_tokens: response.usage.prompt_tokens,
2898                            completion_tokens: response.usage.completion_tokens,
2899                            total_tokens: response.usage.total_tokens,
2900                            thinking_tokens: None,
2901                        }),
2902                        ttft_ms: None,
2903                        tool_calls: Vec::new(),
2904                    })
2905                }
2906                CompletionOutcome::HealedJson(healed) => {
2907                    if !expects_object {
2908                        return Err(
2909                            "healed json outcome is unsupported for non-object schema".to_string()
2910                        );
2911                    }
2912                    if let Some(sink) = event_sink {
2913                        sink.emit(&YamlWorkflowEvent {
2914                            event_type: "node_healed".to_string(),
2915                            node_id: Some(request.node_id.clone()),
2916                            step_id: Some(request.node_id.clone()),
2917                            node_kind: Some("llm_call".to_string()),
2918                            streamable: Some(request.stream),
2919                            message: Some(format!(
2920                                "healed structured response confidence={}",
2921                                healed.parsed.confidence
2922                            )),
2923                            delta: None,
2924                            token_kind: None,
2925                            is_terminal_node_token: None,
2926                            elapsed_ms: None,
2927                            metadata: None,
2928                        });
2929                    }
2930                    Ok(YamlLlmExecutionResult {
2931                        payload: healed.parsed.value,
2932                        usage: Some(YamlLlmTokenUsage {
2933                            prompt_tokens: healed.response.usage.prompt_tokens,
2934                            completion_tokens: healed.response.usage.completion_tokens,
2935                            total_tokens: healed.response.usage.total_tokens,
2936                            thinking_tokens: None,
2937                        }),
2938                        ttft_ms: None,
2939                        tool_calls: Vec::new(),
2940                    })
2941                }
2942                CompletionOutcome::CoercedSchema(coerced) => {
2943                    if !expects_object {
2944                        return Err(
2945                            "coerced schema outcome is unsupported for non-object schema"
2946                                .to_string(),
2947                        );
2948                    }
2949                    Ok(YamlLlmExecutionResult {
2950                        payload: coerced.coerced.value,
2951                        usage: Some(YamlLlmTokenUsage {
2952                            prompt_tokens: coerced.response.usage.prompt_tokens,
2953                            completion_tokens: coerced.response.usage.completion_tokens,
2954                            total_tokens: coerced.response.usage.total_tokens,
2955                            thinking_tokens: None,
2956                        }),
2957                        ttft_ms: None,
2958                        tool_calls: Vec::new(),
2959                    })
2960                }
2961            }
2962        }
2963    }
2964
2965    let executor = BorrowedClientExecutor {
2966        client,
2967        custom_worker,
2968        run_options: options.clone(),
2969    };
2970    run_workflow_yaml_with_custom_worker_and_events_and_options(
2971        workflow,
2972        workflow_input,
2973        &executor,
2974        custom_worker,
2975        event_sink,
2976        options,
2977    )
2978    .await
2979}
2980
2981pub async fn run_email_workflow_yaml_with_client_and_custom_worker_and_events(
2982    workflow: &YamlWorkflow,
2983    email_text: &str,
2984    client: &SimpleAgentsClient,
2985    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2986    event_sink: Option<&dyn YamlWorkflowEventSink>,
2987) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2988    let workflow_input = json!({ "email_text": email_text });
2989    run_workflow_yaml_with_client_and_custom_worker_and_events(
2990        workflow,
2991        &workflow_input,
2992        client,
2993        custom_worker,
2994        event_sink,
2995    )
2996    .await
2997}
2998
2999pub async fn run_workflow_yaml(
3000    workflow: &YamlWorkflow,
3001    workflow_input: &Value,
3002    executor: &dyn YamlWorkflowLlmExecutor,
3003) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3004    run_workflow_yaml_with_custom_worker_and_events(workflow, workflow_input, executor, None, None)
3005        .await
3006}
3007
3008pub async fn run_email_workflow_yaml(
3009    workflow: &YamlWorkflow,
3010    email_text: &str,
3011    executor: &dyn YamlWorkflowLlmExecutor,
3012) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3013    let workflow_input = json!({ "email_text": email_text });
3014    run_workflow_yaml(workflow, &workflow_input, executor).await
3015}
3016
3017pub async fn run_workflow_yaml_with_custom_worker(
3018    workflow: &YamlWorkflow,
3019    workflow_input: &Value,
3020    executor: &dyn YamlWorkflowLlmExecutor,
3021    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3022) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3023    run_workflow_yaml_with_custom_worker_and_events(
3024        workflow,
3025        workflow_input,
3026        executor,
3027        custom_worker,
3028        None,
3029    )
3030    .await
3031}
3032
3033pub async fn run_email_workflow_yaml_with_custom_worker(
3034    workflow: &YamlWorkflow,
3035    email_text: &str,
3036    executor: &dyn YamlWorkflowLlmExecutor,
3037    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3038) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3039    let workflow_input = json!({ "email_text": email_text });
3040    run_workflow_yaml_with_custom_worker(workflow, &workflow_input, executor, custom_worker).await
3041}
3042
3043pub async fn run_workflow_yaml_with_custom_worker_and_events(
3044    workflow: &YamlWorkflow,
3045    workflow_input: &Value,
3046    executor: &dyn YamlWorkflowLlmExecutor,
3047    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3048    event_sink: Option<&dyn YamlWorkflowEventSink>,
3049) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3050    run_workflow_yaml_with_custom_worker_and_events_and_options(
3051        workflow,
3052        workflow_input,
3053        executor,
3054        custom_worker,
3055        event_sink,
3056        &YamlWorkflowRunOptions::default(),
3057    )
3058    .await
3059}
3060
3061pub async fn run_workflow_yaml_with_custom_worker_and_events_and_options(
3062    workflow: &YamlWorkflow,
3063    workflow_input: &Value,
3064    executor: &dyn YamlWorkflowLlmExecutor,
3065    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3066    event_sink: Option<&dyn YamlWorkflowEventSink>,
3067    options: &YamlWorkflowRunOptions,
3068) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3069    if !workflow_input.is_object() {
3070        return Err(YamlWorkflowRunError::InvalidInput {
3071            message: "workflow input must be a JSON object".to_string(),
3072        });
3073    }
3074
3075    validate_sample_rate(options.telemetry.sample_rate)?;
3076
3077    let email_text = workflow_input
3078        .get("email_text")
3079        .and_then(Value::as_str)
3080        .unwrap_or_default();
3081
3082    let diagnostics = verify_yaml_workflow(workflow);
3083    let errors: Vec<YamlWorkflowDiagnostic> = diagnostics
3084        .iter()
3085        .filter(|d| d.severity == YamlWorkflowDiagnosticSeverity::Error)
3086        .cloned()
3087        .collect();
3088    if !errors.is_empty() {
3089        return Err(YamlWorkflowRunError::Validation {
3090            diagnostics_count: errors.len(),
3091            diagnostics: errors,
3092        });
3093    }
3094
3095    if let Some(output) =
3096        try_run_yaml_via_ir_runtime(workflow, workflow_input, executor, custom_worker, options)
3097            .await?
3098    {
3099        return Ok(output);
3100    }
3101
3102    let parent_trace_context = trace_context_from_options(options);
3103    let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
3104
3105    let tracer = workflow_tracer();
3106    let mut workflow_span_context: Option<TraceContext> = None;
3107    let mut workflow_span = if telemetry_context.sampled {
3108        let (span_context, mut span) = tracer.start_span(
3109            "workflow.run",
3110            SpanKind::Workflow,
3111            parent_trace_context.as_ref(),
3112        );
3113        if let Some(trace_id_value) = telemetry_context.trace_id.as_deref() {
3114            span.set_attribute("trace_id", trace_id_value);
3115        }
3116        apply_trace_tenant_attributes(span.as_mut(), options);
3117        workflow_span_context = Some(span_context);
3118        Some(span)
3119    } else {
3120        None
3121    };
3122
3123    if workflow.nodes.is_empty() {
3124        return Err(YamlWorkflowRunError::EmptyNodes {
3125            workflow_id: workflow.id.clone(),
3126        });
3127    }
3128
3129    let node_map: HashMap<&str, &YamlNode> = workflow
3130        .nodes
3131        .iter()
3132        .map(|node| (node.id.as_str(), node))
3133        .collect();
3134    if !node_map.contains_key(workflow.entry_node.as_str()) {
3135        return Err(YamlWorkflowRunError::MissingEntry {
3136            entry_node: workflow.entry_node.clone(),
3137        });
3138    }
3139
3140    let edge_map: HashMap<&str, &str> = workflow
3141        .edges
3142        .iter()
3143        .map(|edge| (edge.from.as_str(), edge.to.as_str()))
3144        .collect();
3145
3146    let mut current = workflow.entry_node.clone();
3147    let mut trace = Vec::new();
3148    let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
3149    let mut globals = serde_json::Map::new();
3150    let mut step_timings = Vec::new();
3151    let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
3152    let mut token_totals = YamlTokenTotals::default();
3153    let mut workflow_ttft_ms: Option<u128> = None;
3154    let started = Instant::now();
3155
3156    if let Some(sink) = event_sink {
3157        sink.emit(&YamlWorkflowEvent {
3158            event_type: "workflow_started".to_string(),
3159            node_id: None,
3160            step_id: None,
3161            node_kind: None,
3162            streamable: None,
3163            message: Some(format!("workflow_id={}", workflow.id)),
3164            delta: None,
3165            token_kind: None,
3166            is_terminal_node_token: None,
3167            elapsed_ms: Some(0),
3168            metadata: None,
3169        });
3170    }
3171
3172    if event_sink_is_cancelled(event_sink) {
3173        return Err(YamlWorkflowRunError::EventSinkCancelled {
3174            message: workflow_event_sink_cancelled_message().to_string(),
3175        });
3176    }
3177
3178    loop {
3179        if event_sink_is_cancelled(event_sink) {
3180            return Err(YamlWorkflowRunError::EventSinkCancelled {
3181                message: workflow_event_sink_cancelled_message().to_string(),
3182            });
3183        }
3184
3185        let node =
3186            *node_map
3187                .get(current.as_str())
3188                .ok_or_else(|| YamlWorkflowRunError::MissingNode {
3189                    node_id: current.clone(),
3190                })?;
3191
3192        trace.push(node.id.clone());
3193        let step_started = Instant::now();
3194
3195        let mut node_span_context: Option<TraceContext> = None;
3196        let mut node_span = if telemetry_context.sampled {
3197            let (span_context, mut span) = tracer.start_span(
3198                "workflow.node.execute",
3199                SpanKind::Node,
3200                workflow_span_context.as_ref(),
3201            );
3202            node_span_context = Some(span_context);
3203            span.set_attribute(
3204                "trace_id",
3205                telemetry_context.trace_id.as_deref().unwrap_or_default(),
3206            );
3207            span.set_attribute("node_id", node.id.as_str());
3208            span.set_attribute("node_kind", node.kind_name());
3209            Some(span)
3210        } else {
3211            None
3212        };
3213
3214        let node_streamable = node
3215            .node_type
3216            .llm_call
3217            .as_ref()
3218            .map(|llm| llm.stream.unwrap_or(false) && !llm.heal.unwrap_or(false));
3219        let workflow_elapsed_before_node_ms = started.elapsed().as_millis();
3220
3221        if let Some(sink) = event_sink {
3222            sink.emit(&YamlWorkflowEvent {
3223                event_type: "node_started".to_string(),
3224                node_id: Some(node.id.clone()),
3225                step_id: Some(node.id.clone()),
3226                node_kind: Some(node.kind_name().to_string()),
3227                streamable: node_streamable,
3228                message: if node_streamable == Some(false) {
3229                    Some("Node is not streamable; status events only".to_string())
3230                } else {
3231                    None
3232                },
3233                delta: None,
3234                token_kind: None,
3235                is_terminal_node_token: None,
3236                elapsed_ms: Some(workflow_elapsed_before_node_ms),
3237                metadata: None,
3238            });
3239        }
3240
3241        if event_sink_is_cancelled(event_sink) {
3242            return Err(YamlWorkflowRunError::EventSinkCancelled {
3243                message: workflow_event_sink_cancelled_message().to_string(),
3244            });
3245        }
3246
3247        let mut node_usage: Option<YamlLlmTokenUsage> = None;
3248        let is_terminal_node = !edge_map.contains_key(node.id.as_str());
3249        let next = if let Some(llm) = &node.node_type.llm_call {
3250            let prompt_template = node
3251                .config
3252                .as_ref()
3253                .and_then(|cfg| cfg.prompt.as_deref())
3254                .unwrap_or_default();
3255            let context = json!({
3256                "input": workflow_input,
3257                "nodes": outputs,
3258                "globals": Value::Object(globals.clone())
3259            });
3260            let messages = if let Some(path) = llm.messages_path.as_deref() {
3261                Some(
3262                    parse_messages_from_context(path, &context).map_err(|message| {
3263                        YamlWorkflowRunError::Llm {
3264                            node_id: node.id.clone(),
3265                            message,
3266                        }
3267                    })?,
3268                )
3269            } else {
3270                None
3271            };
3272            let prompt_bindings = collect_template_bindings(prompt_template, &context);
3273            let prompt = interpolate_template(prompt_template, &context);
3274            let schema = llm_output_schema_for_node(node);
3275
3276            let request = YamlLlmExecutionRequest {
3277                node_id: node.id.clone(),
3278                is_terminal_node,
3279                stream_json_as_text: llm.stream_json_as_text.unwrap_or(false),
3280                model: options
3281                    .model
3282                    .as_ref()
3283                    .and_then(|model| {
3284                        let trimmed = model.trim();
3285                        if trimmed.is_empty() {
3286                            None
3287                        } else {
3288                            Some(trimmed.to_string())
3289                        }
3290                    })
3291                    .unwrap_or_else(|| llm.model.clone()),
3292                messages,
3293                append_prompt_as_user: llm.append_prompt_as_user.unwrap_or(true),
3294                prompt,
3295                prompt_template: prompt_template.to_string(),
3296                prompt_bindings,
3297                schema,
3298                stream: llm.stream.unwrap_or(false),
3299                heal: llm.heal.unwrap_or(false),
3300                tools: normalize_llm_tools(llm).map_err(|message| YamlWorkflowRunError::Llm {
3301                    node_id: node.id.clone(),
3302                    message,
3303                })?,
3304                tool_choice: normalize_tool_choice(llm.tool_choice.clone()).map_err(|message| {
3305                    YamlWorkflowRunError::Llm {
3306                        node_id: node.id.clone(),
3307                        message,
3308                    }
3309                })?,
3310                max_tool_roundtrips: llm.max_tool_roundtrips.unwrap_or(1),
3311                tool_calls_global_key: llm.tool_calls_global_key.clone(),
3312                tool_trace_mode: options.telemetry.tool_trace_mode,
3313                execution_context: context.clone(),
3314                email_text: email_text.to_string(),
3315                trace_id: telemetry_context.trace_id.clone(),
3316                trace_context: node_span_context.clone(),
3317                trace_sampled: telemetry_context.sampled,
3318            };
3319
3320            if let Some(span) = node_span.as_mut() {
3321                span.set_attribute(
3322                    "node_input",
3323                    payload_for_span(options.telemetry.payload_mode, &context).as_str(),
3324                );
3325            }
3326
3327            if let Some(sink) = event_sink {
3328                sink.emit(&YamlWorkflowEvent {
3329                    event_type: "node_llm_input_resolved".to_string(),
3330                    node_id: Some(node.id.clone()),
3331                    step_id: Some(node.id.clone()),
3332                    node_kind: Some("llm_call".to_string()),
3333                    streamable: Some(request.stream),
3334                    message: Some("resolved llm input for telemetry".to_string()),
3335                    delta: None,
3336                    token_kind: None,
3337                    is_terminal_node_token: None,
3338                    elapsed_ms: Some(started.elapsed().as_millis()),
3339                    metadata: Some(json!({
3340                        "model": request.model.clone(),
3341                        "stream_requested": request.stream,
3342                        "stream_json_as_text": request.stream_json_as_text,
3343                        "heal_requested": request.heal,
3344                        "effective_stream": request.stream,
3345                        "prompt_template": request.prompt_template.clone(),
3346                        "prompt": request.prompt.clone(),
3347                        "schema": request.schema.clone(),
3348                        "bindings": request.prompt_bindings.clone(),
3349                        "tools_count": request.tools.len(),
3350                        "max_tool_roundtrips": request.max_tool_roundtrips,
3351                    })),
3352                });
3353            }
3354
3355            if event_sink_is_cancelled(event_sink) {
3356                return Err(YamlWorkflowRunError::EventSinkCancelled {
3357                    message: workflow_event_sink_cancelled_message().to_string(),
3358                });
3359            }
3360
3361            let llm_result = executor
3362                .complete_structured(request, event_sink)
3363                .await
3364                .map_err(|message| YamlWorkflowRunError::Llm {
3365                    node_id: node.id.clone(),
3366                    message,
3367                })?;
3368
3369            if let Some(usage) = llm_result.usage.as_ref() {
3370                token_totals.add_usage(usage);
3371            }
3372            if workflow_ttft_ms.is_none() {
3373                workflow_ttft_ms = llm_result
3374                    .ttft_ms
3375                    .map(|node_ttft_ms| workflow_elapsed_before_node_ms + node_ttft_ms);
3376            }
3377            node_usage = llm_result.usage;
3378
3379            let payload = llm_result.payload;
3380            let tool_calls = llm_result.tool_calls;
3381
3382            let mut node_output = json!({ "output": payload });
3383            if !tool_calls.is_empty() {
3384                if let Some(output_obj) = node_output.as_object_mut() {
3385                    output_obj.insert("tool_calls".to_string(), json!(tool_calls));
3386                }
3387            }
3388            outputs.insert(node.id.clone(), node_output);
3389            if let Some(span) = node_span.as_mut() {
3390                if let Some(output_payload) = outputs.get(node.id.as_str()) {
3391                    span.set_attribute(
3392                        "node_output",
3393                        payload_for_span(options.telemetry.payload_mode, output_payload).as_str(),
3394                    );
3395                }
3396            }
3397            apply_set_globals(node, &outputs, workflow_input, &mut globals);
3398            apply_update_globals(node, &outputs, workflow_input, &mut globals);
3399            if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
3400                if let Some(node_tool_calls) = outputs
3401                    .get(node.id.as_str())
3402                    .and_then(|value| value.get("tool_calls"))
3403                    .cloned()
3404                {
3405                    globals.insert(global_key.clone(), node_tool_calls);
3406                }
3407            }
3408            edge_map
3409                .get(node.id.as_str())
3410                .map(|value| value.to_string())
3411        } else if let Some(switch) = &node.node_type.switch {
3412            let context = json!({
3413                "input": workflow_input,
3414                "nodes": outputs,
3415                "globals": Value::Object(globals.clone())
3416            });
3417            let mut chosen = Some(switch.default.clone());
3418            for branch in &switch.branches {
3419                if evaluate_switch_condition(branch.condition.as_str(), &context)? {
3420                    chosen = Some(branch.target.clone());
3421                    break;
3422                }
3423            }
3424            let chosen = chosen.ok_or_else(|| YamlWorkflowRunError::InvalidSwitchTarget {
3425                node_id: node.id.clone(),
3426            })?;
3427            Some(chosen)
3428        } else if let Some(custom) = &node.node_type.custom_worker {
3429            let payload = node
3430                .config
3431                .as_ref()
3432                .and_then(|cfg| cfg.payload.as_ref())
3433                .cloned()
3434                .unwrap_or_else(|| json!({}));
3435            let context = json!({
3436                "input": workflow_input,
3437                "nodes": outputs,
3438                "globals": Value::Object(globals.clone())
3439            });
3440
3441            if let Some(span) = node_span.as_mut() {
3442                span.set_attribute("handler_name", custom.handler.as_str());
3443                span.set_attribute(
3444                    "node_input",
3445                    payload_for_span(options.telemetry.payload_mode, &payload).as_str(),
3446                );
3447            }
3448
3449            let mut handler_span_context: Option<TraceContext> = None;
3450            let mut handler_span = if telemetry_context.sampled {
3451                let (span_context, mut span) = tracer.start_span(
3452                    "handler.invoke",
3453                    SpanKind::Node,
3454                    workflow_span_context.as_ref(),
3455                );
3456                handler_span_context = Some(span_context);
3457                span.set_attribute(
3458                    "trace_id",
3459                    telemetry_context.trace_id.as_deref().unwrap_or_default(),
3460                );
3461                span.set_attribute("handler_name", custom.handler.as_str());
3462                apply_trace_tenant_attributes(span.as_mut(), options);
3463                Some(span)
3464            } else {
3465                None
3466            };
3467
3468            let worker_trace_context = merged_trace_context_for_worker(
3469                handler_span_context.as_ref(),
3470                telemetry_context.trace_id.as_deref(),
3471                options,
3472            );
3473            let worker_context = custom_worker_context_with_trace(
3474                &context,
3475                &worker_trace_context,
3476                &options.trace.tenant,
3477            );
3478
3479            let worker_output_result = if let Some(custom_worker_executor) = custom_worker {
3480                custom_worker_executor
3481                    .execute(
3482                        custom.handler.as_str(),
3483                        &payload,
3484                        email_text,
3485                        &worker_context,
3486                    )
3487                    .await
3488                    .map_err(|message| YamlWorkflowRunError::CustomWorker {
3489                        node_id: node.id.clone(),
3490                        message,
3491                    })
3492            } else {
3493                mock_custom_worker_output(custom.handler.as_str(), &payload)
3494            };
3495
3496            if let Some(span) = handler_span.take() {
3497                span.end();
3498            }
3499
3500            let worker_output = worker_output_result?;
3501
3502            outputs.insert(node.id.clone(), json!({ "output": worker_output }));
3503            if let Some(span) = node_span.as_mut() {
3504                if let Some(output_payload) = outputs.get(node.id.as_str()) {
3505                    span.set_attribute(
3506                        "node_output",
3507                        payload_for_span(options.telemetry.payload_mode, output_payload).as_str(),
3508                    );
3509                }
3510            }
3511            apply_set_globals(node, &outputs, workflow_input, &mut globals);
3512            apply_update_globals(node, &outputs, workflow_input, &mut globals);
3513            edge_map
3514                .get(node.id.as_str())
3515                .map(|value| value.to_string())
3516        } else {
3517            return Err(YamlWorkflowRunError::UnsupportedNodeType {
3518                node_id: node.id.clone(),
3519            });
3520        };
3521
3522        let node_kind = node.kind_name().to_string();
3523        let elapsed_ms = step_started.elapsed().as_millis();
3524        step_timings.push(YamlStepTiming {
3525            node_id: node.id.clone(),
3526            node_kind,
3527            elapsed_ms,
3528            prompt_tokens: node_usage.as_ref().map(|usage| usage.prompt_tokens),
3529            completion_tokens: node_usage.as_ref().map(|usage| usage.completion_tokens),
3530            total_tokens: node_usage.as_ref().map(|usage| usage.total_tokens),
3531            thinking_tokens: node_usage.as_ref().and_then(|usage| usage.thinking_tokens),
3532            tokens_per_second: node_usage
3533                .as_ref()
3534                .map(|usage| completion_tokens_per_second(usage.completion_tokens, elapsed_ms)),
3535        });
3536
3537        if let Some(usage) = node_usage.as_ref() {
3538            llm_node_metrics.insert(
3539                node.id.clone(),
3540                YamlLlmNodeMetrics {
3541                    elapsed_ms,
3542                    prompt_tokens: usage.prompt_tokens,
3543                    completion_tokens: usage.completion_tokens,
3544                    total_tokens: usage.total_tokens,
3545                    thinking_tokens: usage.thinking_tokens,
3546                    tokens_per_second: completion_tokens_per_second(
3547                        usage.completion_tokens,
3548                        elapsed_ms,
3549                    ),
3550                },
3551            );
3552        }
3553
3554        if let Some(mut span) = node_span.take() {
3555            span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
3556            span.add_event("node_completed");
3557            span.end();
3558        }
3559
3560        if let Some(sink) = event_sink {
3561            sink.emit(&YamlWorkflowEvent {
3562                event_type: "node_completed".to_string(),
3563                node_id: Some(node.id.clone()),
3564                step_id: Some(node.id.clone()),
3565                node_kind: Some(node.kind_name().to_string()),
3566                streamable: node_streamable,
3567                message: None,
3568                delta: None,
3569                token_kind: None,
3570                is_terminal_node_token: None,
3571                elapsed_ms: Some(elapsed_ms),
3572                metadata: None,
3573            });
3574        }
3575
3576        if event_sink_is_cancelled(event_sink) {
3577            return Err(YamlWorkflowRunError::EventSinkCancelled {
3578                message: workflow_event_sink_cancelled_message().to_string(),
3579            });
3580        }
3581
3582        if let Some(next) = next {
3583            current = next;
3584            continue;
3585        }
3586        break;
3587    }
3588
3589    let terminal_node = trace
3590        .last()
3591        .cloned()
3592        .ok_or_else(|| YamlWorkflowRunError::EmptyNodes {
3593            workflow_id: workflow.id.clone(),
3594        })?;
3595
3596    let terminal_output = outputs
3597        .get(terminal_node.as_str())
3598        .and_then(|value| value.get("output"))
3599        .cloned();
3600
3601    let total_elapsed_ms = started.elapsed().as_millis();
3602    let output = YamlWorkflowRunOutput {
3603        workflow_id: workflow.id.clone(),
3604        entry_node: workflow.entry_node.clone(),
3605        email_text: email_text.to_string(),
3606        trace,
3607        outputs,
3608        terminal_node,
3609        terminal_output,
3610        step_timings,
3611        llm_node_metrics,
3612        total_elapsed_ms,
3613        ttft_ms: workflow_ttft_ms,
3614        total_input_tokens: token_totals.input_tokens,
3615        total_output_tokens: token_totals.output_tokens,
3616        total_tokens: token_totals.total_tokens,
3617        total_thinking_tokens: token_totals.thinking_tokens,
3618        tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
3619        trace_id: telemetry_context.trace_id.clone(),
3620        metadata: telemetry_context.trace_id.as_ref().map(|value| {
3621            workflow_metadata_with_trace(
3622                options,
3623                value,
3624                telemetry_context.sampled,
3625                telemetry_context.trace_id_source,
3626            )
3627        }),
3628    };
3629
3630    if let Some(sink) = event_sink {
3631        let event_metadata = if options.telemetry.nerdstats {
3632            Some(json!({
3633                "nerdstats": workflow_nerdstats(&output),
3634            }))
3635        } else {
3636            None
3637        };
3638        sink.emit(&YamlWorkflowEvent {
3639            event_type: "workflow_completed".to_string(),
3640            node_id: None,
3641            step_id: None,
3642            node_kind: None,
3643            streamable: None,
3644            message: Some(format!("terminal_node={}", output.terminal_node)),
3645            delta: None,
3646            token_kind: None,
3647            is_terminal_node_token: None,
3648            elapsed_ms: Some(output.total_elapsed_ms),
3649            metadata: event_metadata,
3650        });
3651    }
3652
3653    if event_sink_is_cancelled(event_sink) {
3654        return Err(YamlWorkflowRunError::EventSinkCancelled {
3655            message: workflow_event_sink_cancelled_message().to_string(),
3656        });
3657    }
3658
3659    if let Some(mut span) = workflow_span.take() {
3660        span.set_attribute("workflow_id", workflow.id.as_str());
3661        if let Some(trace_id_value) = telemetry_context.trace_id.as_ref() {
3662            span.set_attribute("trace_id", trace_id_value.as_str());
3663        }
3664        span.end();
3665        flush_workflow_tracer();
3666    }
3667
3668    Ok(output)
3669}
3670
3671async fn try_run_yaml_via_ir_runtime(
3672    workflow: &YamlWorkflow,
3673    workflow_input: &Value,
3674    executor: &dyn YamlWorkflowLlmExecutor,
3675    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3676    options: &YamlWorkflowRunOptions,
3677) -> Result<Option<YamlWorkflowRunOutput>, YamlWorkflowRunError> {
3678    let ir = match yaml_workflow_to_ir(workflow) {
3679        Ok(def) => def,
3680        Err(YamlToIrError::UnsupportedNode { .. })
3681        | Err(YamlToIrError::MultipleOutgoingEdge { .. }) => return Ok(None),
3682        Err(err) => {
3683            return Err(YamlWorkflowRunError::InvalidInput {
3684                message: err.to_string(),
3685            });
3686        }
3687    };
3688
3689    let parent_trace_context = trace_context_from_options(options);
3690    let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
3691
3692    let tracer = workflow_tracer();
3693    let mut workflow_span_context: Option<TraceContext> = None;
3694    let mut workflow_span = if telemetry_context.sampled {
3695        let (span_context, mut span) = tracer.start_span(
3696            "workflow.run",
3697            SpanKind::Workflow,
3698            parent_trace_context.as_ref(),
3699        );
3700        if let Some(trace_id_value) = telemetry_context.trace_id.as_deref() {
3701            span.set_attribute("trace_id", trace_id_value);
3702        }
3703        apply_trace_tenant_attributes(span.as_mut(), options);
3704        workflow_span_context = Some(span_context);
3705        Some(span)
3706    } else {
3707        None
3708    };
3709
3710    struct NoopLlm;
3711    #[async_trait]
3712    impl LlmExecutor for NoopLlm {
3713        async fn execute(
3714            &self,
3715            _input: LlmExecutionInput,
3716        ) -> Result<LlmExecutionOutput, LlmExecutionError> {
3717            Err(LlmExecutionError::UnexpectedOutcome(
3718                "yaml_ir_uses_tool_path",
3719            ))
3720        }
3721    }
3722
3723    struct YamlIrToolExecutor<'a> {
3724        llm_executor: &'a dyn YamlWorkflowLlmExecutor,
3725        custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
3726        token_totals: std::sync::Mutex<YamlTokenTotals>,
3727        node_usage: std::sync::Mutex<BTreeMap<String, YamlLlmTokenUsage>>,
3728        trace_id: Option<String>,
3729        trace_context: Option<TraceContext>,
3730        trace_input_context: Option<YamlWorkflowTraceContextInput>,
3731        tenant_context: YamlWorkflowTraceTenantContext,
3732        payload_mode: YamlWorkflowPayloadMode,
3733        trace_sampled: bool,
3734    }
3735
3736    #[async_trait]
3737    impl ToolExecutor for YamlIrToolExecutor<'_> {
3738        async fn execute_tool(
3739            &self,
3740            input: ToolExecutionInput,
3741        ) -> Result<Value, ToolExecutionError> {
3742            let context = build_yaml_context_from_ir_scope(&input.scoped_input);
3743
3744            if input.tool == YAML_LLM_TOOL_ID {
3745                let node_id = input
3746                    .input
3747                    .get("node_id")
3748                    .and_then(Value::as_str)
3749                    .ok_or_else(|| {
3750                        ToolExecutionError::Failed("yaml llm call missing node_id".to_string())
3751                    })?
3752                    .to_string();
3753                let node_id_for_metrics = node_id.clone();
3754                let model = input
3755                    .input
3756                    .get("model")
3757                    .and_then(Value::as_str)
3758                    .ok_or_else(|| {
3759                        ToolExecutionError::Failed("yaml llm call missing model".to_string())
3760                    })?
3761                    .to_string();
3762                let prompt_template = input
3763                    .input
3764                    .get("prompt_template")
3765                    .and_then(Value::as_str)
3766                    .unwrap_or_default()
3767                    .to_string();
3768                let stream = input
3769                    .input
3770                    .get("stream")
3771                    .and_then(Value::as_bool)
3772                    .unwrap_or(false);
3773                let heal = input
3774                    .input
3775                    .get("heal")
3776                    .and_then(Value::as_bool)
3777                    .unwrap_or(false);
3778                let append_prompt_as_user = input
3779                    .input
3780                    .get("append_prompt_as_user")
3781                    .and_then(Value::as_bool)
3782                    .unwrap_or(true);
3783                let messages_path = input
3784                    .input
3785                    .get("messages_path")
3786                    .and_then(Value::as_str)
3787                    .map(str::to_string);
3788
3789                let messages = if let Some(path) = messages_path.as_deref() {
3790                    Some(
3791                        parse_messages_from_context(path, &context)
3792                            .map_err(ToolExecutionError::Failed)?,
3793                    )
3794                } else {
3795                    None
3796                };
3797
3798                let prompt_bindings = collect_template_bindings(&prompt_template, &context);
3799                let prompt = interpolate_template(&prompt_template, &context);
3800                let email_text = context
3801                    .get("input")
3802                    .and_then(|v| v.get("email_text"))
3803                    .and_then(Value::as_str)
3804                    .unwrap_or_default();
3805                let schema = input
3806                    .input
3807                    .get("output_schema")
3808                    .cloned()
3809                    .unwrap_or_else(default_llm_output_schema);
3810
3811                let request = YamlLlmExecutionRequest {
3812                    node_id,
3813                    is_terminal_node: false,
3814                    stream_json_as_text: input
3815                        .input
3816                        .get("stream_json_as_text")
3817                        .and_then(Value::as_bool)
3818                        .unwrap_or(false),
3819                    model,
3820                    messages,
3821                    append_prompt_as_user,
3822                    prompt,
3823                    prompt_template,
3824                    prompt_bindings,
3825                    schema,
3826                    stream,
3827                    heal,
3828                    tools: Vec::new(),
3829                    tool_choice: None,
3830                    max_tool_roundtrips: 1,
3831                    tool_calls_global_key: None,
3832                    tool_trace_mode: YamlToolTraceMode::Off,
3833                    execution_context: context.clone(),
3834                    email_text: email_text.to_string(),
3835                    trace_id: self.trace_id.clone(),
3836                    trace_context: self.trace_context.clone(),
3837                    trace_sampled: self.trace_sampled,
3838                };
3839
3840                let llm_result = self
3841                    .llm_executor
3842                    .complete_structured(request, None)
3843                    .await
3844                    .map_err(ToolExecutionError::Failed);
3845
3846                if let Ok(ref result) = llm_result {
3847                    if let Some(usage) = result.usage.as_ref() {
3848                        if let Ok(mut totals) = self.token_totals.lock() {
3849                            totals.add_usage(usage);
3850                        }
3851                        if let Ok(mut usage_map) = self.node_usage.lock() {
3852                            usage_map.insert(node_id_for_metrics, usage.clone());
3853                        }
3854                    }
3855                }
3856
3857                return llm_result.map(|result| result.payload);
3858            }
3859
3860            let worker = self
3861                .custom_worker
3862                .ok_or_else(|| ToolExecutionError::NotFound {
3863                    tool: input.tool.clone(),
3864                })?;
3865
3866            let payload = input.input.clone();
3867            let email_text = context
3868                .get("input")
3869                .and_then(|v| v.get("email_text"))
3870                .and_then(Value::as_str)
3871                .unwrap_or_default();
3872
3873            let tracer = workflow_tracer();
3874            let mut handler_span_context: Option<TraceContext> = None;
3875            let mut handler_span = if self.trace_sampled {
3876                let (span_context, mut span) = tracer.start_span(
3877                    "handler.invoke",
3878                    SpanKind::Node,
3879                    self.trace_context.as_ref(),
3880                );
3881                handler_span_context = Some(span_context);
3882                if let Some(trace_id) = self.trace_id.as_ref() {
3883                    span.set_attribute("trace_id", trace_id.as_str());
3884                }
3885                span.set_attribute("handler_name", input.tool.as_str());
3886                if let Some(workspace_id) = self.tenant_context.workspace_id.as_deref() {
3887                    span.set_attribute("tenant.workspace_id", workspace_id);
3888                }
3889                if let Some(user_id) = self.tenant_context.user_id.as_deref() {
3890                    span.set_attribute("tenant.user_id", user_id);
3891                }
3892                if let Some(conversation_id) = self.tenant_context.conversation_id.as_deref() {
3893                    span.set_attribute("tenant.conversation_id", conversation_id);
3894                }
3895                if let Some(request_id) = self.tenant_context.request_id.as_deref() {
3896                    span.set_attribute("tenant.request_id", request_id);
3897                }
3898                if let Some(run_id) = self.tenant_context.run_id.as_deref() {
3899                    span.set_attribute("tenant.run_id", run_id);
3900                }
3901                span.set_attribute(
3902                    "node_input",
3903                    payload_for_span(self.payload_mode, &payload).as_str(),
3904                );
3905                Some(span)
3906            } else {
3907                None
3908            };
3909
3910            let trace_options = YamlWorkflowRunOptions {
3911                telemetry: YamlWorkflowTelemetryConfig::default(),
3912                trace: YamlWorkflowTraceOptions {
3913                    context: self.trace_input_context.clone(),
3914                    tenant: self.tenant_context.clone(),
3915                },
3916                model: None,
3917            };
3918            let worker_trace_context = merged_trace_context_for_worker(
3919                handler_span_context.as_ref(),
3920                self.trace_id.as_deref(),
3921                &trace_options,
3922            );
3923            let worker_context = custom_worker_context_with_trace(
3924                &context,
3925                &worker_trace_context,
3926                &self.tenant_context,
3927            );
3928
3929            let output_result = worker
3930                .execute(&input.tool, &payload, email_text, &worker_context)
3931                .await
3932                .map_err(ToolExecutionError::Failed);
3933
3934            if let Some(span) = handler_span.as_mut() {
3935                if output_result.is_ok() {
3936                    span.add_event("handler.success");
3937                } else {
3938                    span.add_event("handler.error");
3939                }
3940            }
3941
3942            if let Some(span) = handler_span.take() {
3943                span.end();
3944            }
3945
3946            output_result
3947        }
3948    }
3949
3950    let tool_executor = YamlIrToolExecutor {
3951        llm_executor: executor,
3952        custom_worker,
3953        token_totals: std::sync::Mutex::new(YamlTokenTotals::default()),
3954        node_usage: std::sync::Mutex::new(BTreeMap::new()),
3955        trace_id: telemetry_context.trace_id.clone(),
3956        trace_context: workflow_span_context.clone(),
3957        trace_input_context: options.trace.context.clone(),
3958        tenant_context: options.trace.tenant.clone(),
3959        payload_mode: options.telemetry.payload_mode,
3960        trace_sampled: telemetry_context.sampled,
3961    };
3962
3963    let runtime = WorkflowRuntime::new(
3964        ir,
3965        &NoopLlm,
3966        Some(&tool_executor),
3967        WorkflowRuntimeOptions::default(),
3968    );
3969
3970    let started = Instant::now();
3971    let result = match runtime.execute(workflow_input.clone(), None).await {
3972        Ok(result) => result,
3973        Err(WorkflowRuntimeError::Validation(_)) => return Ok(None),
3974        Err(error) => {
3975            return Err(YamlWorkflowRunError::IrRuntime {
3976                message: error.to_string(),
3977            });
3978        }
3979    };
3980    let total_elapsed_ms = started.elapsed().as_millis();
3981
3982    let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
3983    for (node_id, output) in result.node_outputs {
3984        if node_id == YAML_START_NODE_ID {
3985            continue;
3986        }
3987        outputs.insert(node_id, json!({"output": output}));
3988    }
3989
3990    let mut trace = Vec::new();
3991    let mut step_timings = Vec::new();
3992    let node_usage_map = tool_executor
3993        .node_usage
3994        .lock()
3995        .map(|usage| usage.clone())
3996        .unwrap_or_default();
3997    let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
3998    for execution in result.node_executions {
3999        if execution.node_id == YAML_START_NODE_ID {
4000            continue;
4001        }
4002        trace.push(execution.node_id.clone());
4003        let usage = node_usage_map.get(&execution.node_id);
4004        if let Some(usage) = usage {
4005            llm_node_metrics.insert(
4006                execution.node_id.clone(),
4007                YamlLlmNodeMetrics {
4008                    elapsed_ms: 0,
4009                    prompt_tokens: usage.prompt_tokens,
4010                    completion_tokens: usage.completion_tokens,
4011                    total_tokens: usage.total_tokens,
4012                    thinking_tokens: usage.thinking_tokens,
4013                    tokens_per_second: completion_tokens_per_second(usage.completion_tokens, 0),
4014                },
4015            );
4016        }
4017        step_timings.push(YamlStepTiming {
4018            node_id: execution.node_id,
4019            node_kind: "ir_runtime".to_string(),
4020            elapsed_ms: 0,
4021            prompt_tokens: usage.map(|value| value.prompt_tokens),
4022            completion_tokens: usage.map(|value| value.completion_tokens),
4023            total_tokens: usage.map(|value| value.total_tokens),
4024            thinking_tokens: usage.and_then(|value| value.thinking_tokens),
4025            tokens_per_second: usage
4026                .map(|value| completion_tokens_per_second(value.completion_tokens, 0)),
4027        });
4028    }
4029
4030    let terminal_node = result.terminal_node_id;
4031    let terminal_output = outputs
4032        .get(&terminal_node)
4033        .and_then(|v| v.get("output"))
4034        .cloned();
4035
4036    let email_text = workflow_input
4037        .get("email_text")
4038        .and_then(Value::as_str)
4039        .unwrap_or_default()
4040        .to_string();
4041
4042    let token_totals = tool_executor
4043        .token_totals
4044        .lock()
4045        .map(|totals| totals.clone())
4046        .unwrap_or_default();
4047
4048    if let Some(mut span) = workflow_span.take() {
4049        span.set_attribute("workflow_id", workflow.id.as_str());
4050        if let Some(trace_id_value) = telemetry_context.trace_id.as_ref() {
4051            span.set_attribute("trace_id", trace_id_value.as_str());
4052        }
4053        span.end();
4054        flush_workflow_tracer();
4055    }
4056
4057    Ok(Some(YamlWorkflowRunOutput {
4058        workflow_id: workflow.id.clone(),
4059        entry_node: workflow.entry_node.clone(),
4060        email_text,
4061        trace,
4062        outputs,
4063        terminal_node,
4064        terminal_output,
4065        step_timings,
4066        llm_node_metrics,
4067        total_elapsed_ms,
4068        ttft_ms: None,
4069        total_input_tokens: token_totals.input_tokens,
4070        total_output_tokens: token_totals.output_tokens,
4071        total_tokens: token_totals.total_tokens,
4072        total_thinking_tokens: token_totals.thinking_tokens,
4073        tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
4074        trace_id: telemetry_context.trace_id.clone(),
4075        metadata: telemetry_context.trace_id.as_ref().map(|value| {
4076            workflow_metadata_with_trace(
4077                options,
4078                value,
4079                telemetry_context.sampled,
4080                telemetry_context.trace_id_source,
4081            )
4082        }),
4083    }))
4084}
4085
4086fn build_yaml_context_from_ir_scope(scoped_input: &Value) -> Value {
4087    let input = scoped_input.get("input").cloned().unwrap_or(Value::Null);
4088
4089    let mut nodes = serde_json::Map::new();
4090    if let Some(node_outputs) = scoped_input.get("node_outputs").and_then(Value::as_object) {
4091        for (node_id, output) in node_outputs {
4092            nodes.insert(node_id.clone(), json!({"output": output.clone()}));
4093        }
4094    }
4095
4096    json!({
4097        "input": input,
4098        "nodes": Value::Object(nodes),
4099        "globals": Value::Object(serde_json::Map::new())
4100    })
4101}
4102
4103pub async fn run_email_workflow_yaml_with_custom_worker_and_events(
4104    workflow: &YamlWorkflow,
4105    email_text: &str,
4106    executor: &dyn YamlWorkflowLlmExecutor,
4107    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
4108    event_sink: Option<&dyn YamlWorkflowEventSink>,
4109) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
4110    let workflow_input = json!({ "email_text": email_text });
4111    run_workflow_yaml_with_custom_worker_and_events(
4112        workflow,
4113        &workflow_input,
4114        executor,
4115        custom_worker,
4116        event_sink,
4117    )
4118    .await
4119}
4120
4121fn evaluate_switch_condition(
4122    condition: &str,
4123    context: &Value,
4124) -> Result<bool, YamlWorkflowRunError> {
4125    let (left, right) =
4126        condition
4127            .split_once("==")
4128            .ok_or_else(|| YamlWorkflowRunError::UnsupportedCondition {
4129                condition: condition.to_string(),
4130            })?;
4131
4132    let left_path = left.trim().trim_start_matches("$.");
4133    let right_literal = right.trim().trim_matches('"').trim_matches('\'');
4134    let left_value = resolve_path(context, left_path);
4135    Ok(left_value
4136        .and_then(Value::as_str)
4137        .map(|value| value == right_literal)
4138        .unwrap_or(false))
4139}
4140
4141fn parse_messages_from_context(path: &str, context: &Value) -> Result<Vec<Message>, String> {
4142    let normalized_path = path.trim().trim_start_matches("$.");
4143    let value = resolve_path(context, normalized_path)
4144        .ok_or_else(|| format!("messages_path not found: {path}"))?;
4145    let list: Vec<WorkflowMessage> = serde_json::from_value(value.clone()).map_err(|err| {
4146        format!("messages_path must resolve to a list of messages: {path}; {err}")
4147    })?;
4148    if list.is_empty() {
4149        return Err(format!(
4150            "messages_path must not resolve to an empty list: {path}"
4151        ));
4152    }
4153
4154    let mut messages = Vec::with_capacity(list.len());
4155    for (index, item) in list.into_iter().enumerate() {
4156        let mut message = match item.role {
4157            Role::System => Message::system(item.content),
4158            Role::User => Message::user(item.content),
4159            Role::Assistant => Message::assistant(item.content),
4160            Role::Tool => {
4161                let tool_call_id = item
4162                    .tool_call_id
4163                    .ok_or_else(|| format!("tool message at index {index} missing tool_call_id"))?;
4164                Message::tool(item.content, tool_call_id)
4165            }
4166        };
4167
4168        if let Some(name) = item.name {
4169            message = message.with_name(name);
4170        }
4171
4172        messages.push(message);
4173    }
4174
4175    Ok(messages)
4176}
4177
4178pub fn verify_yaml_workflow(workflow: &YamlWorkflow) -> Vec<YamlWorkflowDiagnostic> {
4179    let mut diagnostics = Vec::new();
4180    let known_ids: HashMap<&str, &YamlNode> = workflow
4181        .nodes
4182        .iter()
4183        .map(|node| (node.id.as_str(), node))
4184        .collect();
4185
4186    if !known_ids.contains_key(workflow.entry_node.as_str()) {
4187        diagnostics.push(YamlWorkflowDiagnostic {
4188            node_id: None,
4189            code: "missing_entry".to_string(),
4190            severity: YamlWorkflowDiagnosticSeverity::Error,
4191            message: format!("entry node '{}' does not exist", workflow.entry_node),
4192        });
4193    }
4194
4195    for edge in &workflow.edges {
4196        if !known_ids.contains_key(edge.from.as_str()) {
4197            diagnostics.push(YamlWorkflowDiagnostic {
4198                node_id: Some(edge.from.clone()),
4199                code: "unknown_edge_from".to_string(),
4200                severity: YamlWorkflowDiagnosticSeverity::Error,
4201                message: format!("edge.from '{}' does not exist", edge.from),
4202            });
4203        }
4204        if !known_ids.contains_key(edge.to.as_str()) {
4205            diagnostics.push(YamlWorkflowDiagnostic {
4206                node_id: Some(edge.to.clone()),
4207                code: "unknown_edge_to".to_string(),
4208                severity: YamlWorkflowDiagnosticSeverity::Error,
4209                message: format!("edge.to '{}' does not exist", edge.to),
4210            });
4211        }
4212    }
4213
4214    for node in &workflow.nodes {
4215        if let Some(llm) = &node.node_type.llm_call {
4216            if llm.model.trim().is_empty() {
4217                diagnostics.push(YamlWorkflowDiagnostic {
4218                    node_id: Some(node.id.clone()),
4219                    code: "empty_model".to_string(),
4220                    severity: YamlWorkflowDiagnosticSeverity::Error,
4221                    message: "llm_call.model must not be empty".to_string(),
4222                });
4223            }
4224            if llm.stream.unwrap_or(false) && llm.heal.unwrap_or(false) {
4225                diagnostics.push(YamlWorkflowDiagnostic {
4226                    node_id: Some(node.id.clone()),
4227                    code: "stream_heal_conflict".to_string(),
4228                    severity: YamlWorkflowDiagnosticSeverity::Warning,
4229                    message:
4230                        "llm_call.stream=true with heal=true is not streamable; runtime will disable streaming"
4231                            .to_string(),
4232                });
4233            }
4234
4235            if llm.max_tool_roundtrips.unwrap_or(1) == 0 {
4236                diagnostics.push(YamlWorkflowDiagnostic {
4237                    node_id: Some(node.id.clone()),
4238                    code: "invalid_max_tool_roundtrips".to_string(),
4239                    severity: YamlWorkflowDiagnosticSeverity::Error,
4240                    message: "llm_call.max_tool_roundtrips must be >= 1".to_string(),
4241                });
4242            }
4243
4244            if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
4245                if global_key.trim().is_empty() {
4246                    diagnostics.push(YamlWorkflowDiagnostic {
4247                        node_id: Some(node.id.clone()),
4248                        code: "empty_tool_calls_global_key".to_string(),
4249                        severity: YamlWorkflowDiagnosticSeverity::Error,
4250                        message: "llm_call.tool_calls_global_key must not be empty".to_string(),
4251                    });
4252                }
4253            }
4254
4255            match normalize_tool_choice(llm.tool_choice.clone()) {
4256                Ok(choice) => {
4257                    if let Some(ToolChoice::Tool(choice_tool)) = choice.as_ref() {
4258                        if !llm.tools.iter().any(|tool| match (llm.tools_format, tool) {
4259                            (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
4260                                openai.function.name == choice_tool.function.name
4261                            }
4262                            (
4263                                YamlToolFormat::Simplified,
4264                                YamlToolDeclaration::Simplified(simple),
4265                            ) => simple.name == choice_tool.function.name,
4266                            _ => false,
4267                        }) {
4268                            diagnostics.push(YamlWorkflowDiagnostic {
4269                                node_id: Some(node.id.clone()),
4270                                code: "unknown_tool_choice_function".to_string(),
4271                                severity: YamlWorkflowDiagnosticSeverity::Error,
4272                                message: format!(
4273                                    "llm_call.tool_choice references unknown function '{}'",
4274                                    choice_tool.function.name
4275                                ),
4276                            });
4277                        }
4278                    }
4279                }
4280                Err(message) => {
4281                    diagnostics.push(YamlWorkflowDiagnostic {
4282                        node_id: Some(node.id.clone()),
4283                        code: "invalid_tool_choice".to_string(),
4284                        severity: YamlWorkflowDiagnosticSeverity::Error,
4285                        message,
4286                    });
4287                }
4288            }
4289
4290            let normalized_tools = match normalize_llm_tools(llm) {
4291                Ok(tools) => tools,
4292                Err(message) => {
4293                    diagnostics.push(YamlWorkflowDiagnostic {
4294                        node_id: Some(node.id.clone()),
4295                        code: "invalid_tools_format".to_string(),
4296                        severity: YamlWorkflowDiagnosticSeverity::Error,
4297                        message,
4298                    });
4299                    Vec::new()
4300                }
4301            };
4302
4303            let mut seen_tool_names = HashSet::new();
4304            for tool in &normalized_tools {
4305                let name = tool.definition.function.name.trim();
4306                if name.is_empty() {
4307                    diagnostics.push(YamlWorkflowDiagnostic {
4308                        node_id: Some(node.id.clone()),
4309                        code: "empty_tool_name".to_string(),
4310                        severity: YamlWorkflowDiagnosticSeverity::Error,
4311                        message: "tool function name must not be empty".to_string(),
4312                    });
4313                }
4314                if !seen_tool_names.insert(tool.definition.function.name.clone()) {
4315                    diagnostics.push(YamlWorkflowDiagnostic {
4316                        node_id: Some(node.id.clone()),
4317                        code: "duplicate_tool_name".to_string(),
4318                        severity: YamlWorkflowDiagnosticSeverity::Error,
4319                        message: format!(
4320                            "duplicate tool function name '{}' in node",
4321                            tool.definition.function.name
4322                        ),
4323                    });
4324                }
4325
4326                let schema = tool
4327                    .definition
4328                    .function
4329                    .parameters
4330                    .clone()
4331                    .unwrap_or(Value::Null);
4332                if schema.is_null() {
4333                    diagnostics.push(YamlWorkflowDiagnostic {
4334                        node_id: Some(node.id.clone()),
4335                        code: "missing_tool_input_schema".to_string(),
4336                        severity: YamlWorkflowDiagnosticSeverity::Error,
4337                        message: format!(
4338                            "tool '{}' is missing input schema",
4339                            tool.definition.function.name
4340                        ),
4341                    });
4342                } else if let Err(message) = validate_json_schema(&schema) {
4343                    diagnostics.push(YamlWorkflowDiagnostic {
4344                        node_id: Some(node.id.clone()),
4345                        code: "invalid_tool_input_schema".to_string(),
4346                        severity: YamlWorkflowDiagnosticSeverity::Error,
4347                        message: format!(
4348                            "tool '{}' has invalid input schema: {}",
4349                            tool.definition.function.name, message
4350                        ),
4351                    });
4352                }
4353
4354                if let Some(output_schema) = tool.output_schema.as_ref() {
4355                    if let Err(message) = validate_json_schema(output_schema) {
4356                        diagnostics.push(YamlWorkflowDiagnostic {
4357                            node_id: Some(node.id.clone()),
4358                            code: "invalid_tool_output_schema".to_string(),
4359                            severity: YamlWorkflowDiagnosticSeverity::Error,
4360                            message: format!(
4361                                "tool '{}' has invalid output schema: {}",
4362                                tool.definition.function.name, message
4363                            ),
4364                        });
4365                    }
4366                }
4367            }
4368        }
4369
4370        if let Some(switch) = &node.node_type.switch {
4371            for branch in &switch.branches {
4372                if !known_ids.contains_key(branch.target.as_str()) {
4373                    diagnostics.push(YamlWorkflowDiagnostic {
4374                        node_id: Some(node.id.clone()),
4375                        code: "unknown_switch_target".to_string(),
4376                        severity: YamlWorkflowDiagnosticSeverity::Error,
4377                        message: format!("switch branch target '{}' does not exist", branch.target),
4378                    });
4379                }
4380            }
4381            if !known_ids.contains_key(switch.default.as_str()) {
4382                diagnostics.push(YamlWorkflowDiagnostic {
4383                    node_id: Some(node.id.clone()),
4384                    code: "unknown_switch_default".to_string(),
4385                    severity: YamlWorkflowDiagnosticSeverity::Error,
4386                    message: format!("switch default target '{}' does not exist", switch.default),
4387                });
4388            }
4389        }
4390
4391        if let Some(config) = node.config.as_ref() {
4392            if let Some(update_globals) = config.update_globals.as_ref() {
4393                for (key, update) in update_globals {
4394                    let is_valid_op =
4395                        matches!(update.op.as_str(), "set" | "append" | "increment" | "merge");
4396                    if !is_valid_op {
4397                        diagnostics.push(YamlWorkflowDiagnostic {
4398                            node_id: Some(node.id.clone()),
4399                            code: "unknown_update_op".to_string(),
4400                            severity: YamlWorkflowDiagnosticSeverity::Error,
4401                            message: format!(
4402                                "update_globals key '{}' has unknown op '{}'; expected set|append|increment|merge",
4403                                key, update.op
4404                            ),
4405                        });
4406                    }
4407
4408                    if update.op != "increment" && update.from.is_none() {
4409                        diagnostics.push(YamlWorkflowDiagnostic {
4410                            node_id: Some(node.id.clone()),
4411                            code: "missing_update_from".to_string(),
4412                            severity: YamlWorkflowDiagnosticSeverity::Error,
4413                            message: format!(
4414                                "update_globals key '{}' with op '{}' requires 'from'",
4415                                key, update.op
4416                            ),
4417                        });
4418                    }
4419                }
4420            }
4421        }
4422    }
4423
4424    diagnostics
4425}
4426
4427fn resolve_path<'a>(value: &'a Value, path: &str) -> Option<&'a Value> {
4428    path.split('.')
4429        .filter(|segment| !segment.is_empty())
4430        .try_fold(value, |current, segment| {
4431            if let Ok(index) = segment.parse::<usize>() {
4432                return current.get(index);
4433            }
4434            current.get(segment)
4435        })
4436}
4437
4438fn interpolate_template(template: &str, context: &Value) -> String {
4439    let mut out = String::with_capacity(template.len());
4440    let mut rest = template;
4441
4442    loop {
4443        let Some(start) = rest.find("{{") else {
4444            out.push_str(rest);
4445            break;
4446        };
4447
4448        out.push_str(&rest[..start]);
4449        let after_start = &rest[start + 2..];
4450        let Some(end) = after_start.find("}}") else {
4451            out.push_str(&rest[start..]);
4452            break;
4453        };
4454
4455        let expr = after_start[..end].trim();
4456        let source_path = expr.trim_start_matches("$.");
4457        let replacement = resolve_path(context, source_path)
4458            .map(value_to_template_string)
4459            .unwrap_or_default();
4460        out.push_str(replacement.as_str());
4461
4462        rest = &after_start[end + 2..];
4463    }
4464
4465    out
4466}
4467
4468fn collect_template_bindings(template: &str, context: &Value) -> Vec<YamlTemplateBinding> {
4469    let mut bindings = Vec::new();
4470    let mut rest = template;
4471
4472    loop {
4473        let Some(start) = rest.find("{{") else {
4474            break;
4475        };
4476
4477        let after_start = &rest[start + 2..];
4478        let Some(end) = after_start.find("}}") else {
4479            break;
4480        };
4481
4482        let expr = after_start[..end].trim();
4483        let source_path = expr.trim_start_matches("$.").to_string();
4484        let resolved = resolve_path(context, source_path.as_str()).cloned();
4485        let missing = resolved.is_none();
4486        let resolved_value = resolved.unwrap_or(Value::Null);
4487        bindings.push(YamlTemplateBinding {
4488            index: bindings.len(),
4489            expression: expr.to_string(),
4490            source_path,
4491            resolved_type: json_type_name(&resolved_value).to_string(),
4492            missing,
4493            resolved: resolved_value,
4494        });
4495
4496        rest = &after_start[end + 2..];
4497    }
4498
4499    bindings
4500}
4501
4502fn json_type_name(value: &Value) -> &'static str {
4503    match value {
4504        Value::Null => "null",
4505        Value::Bool(_) => "bool",
4506        Value::Number(_) => "number",
4507        Value::String(_) => "string",
4508        Value::Array(_) => "array",
4509        Value::Object(_) => "object",
4510    }
4511}
4512
4513fn value_to_template_string(value: &Value) -> String {
4514    match value {
4515        Value::Null => String::new(),
4516        Value::Bool(v) => v.to_string(),
4517        Value::Number(v) => v.to_string(),
4518        Value::String(v) => v.clone(),
4519        Value::Array(_) | Value::Object(_) => serde_json::to_string(value).unwrap_or_default(),
4520    }
4521}
4522
4523fn apply_set_globals(
4524    node: &YamlNode,
4525    outputs: &BTreeMap<String, Value>,
4526    workflow_input: &Value,
4527    globals: &mut serde_json::Map<String, Value>,
4528) {
4529    let Some(config) = node.config.as_ref() else {
4530        return;
4531    };
4532    let Some(set_globals) = config.set_globals.as_ref() else {
4533        return;
4534    };
4535
4536    let context = json!({
4537        "input": workflow_input,
4538        "nodes": outputs,
4539        "globals": Value::Object(globals.clone())
4540    });
4541
4542    for (key, expr) in set_globals {
4543        let value = resolve_path(&context, expr.as_str())
4544            .cloned()
4545            .unwrap_or(Value::Null);
4546        globals.insert(key.clone(), value);
4547    }
4548}
4549
4550fn apply_update_globals(
4551    node: &YamlNode,
4552    outputs: &BTreeMap<String, Value>,
4553    workflow_input: &Value,
4554    globals: &mut serde_json::Map<String, Value>,
4555) {
4556    let Some(config) = node.config.as_ref() else {
4557        return;
4558    };
4559    let Some(update_globals) = config.update_globals.as_ref() else {
4560        return;
4561    };
4562
4563    let context = json!({
4564        "input": workflow_input,
4565        "nodes": outputs,
4566        "globals": Value::Object(globals.clone())
4567    });
4568
4569    for (key, update) in update_globals {
4570        match update.op.as_str() {
4571            "set" => {
4572                if let Some(path) = update.from.as_ref() {
4573                    let value = resolve_path(&context, path.as_str())
4574                        .cloned()
4575                        .unwrap_or(Value::Null);
4576                    globals.insert(key.clone(), value);
4577                }
4578            }
4579            "append" => {
4580                if let Some(path) = update.from.as_ref() {
4581                    let value = resolve_path(&context, path.as_str())
4582                        .cloned()
4583                        .unwrap_or(Value::Null);
4584                    let entry = globals
4585                        .entry(key.clone())
4586                        .or_insert_with(|| Value::Array(Vec::new()));
4587                    match entry {
4588                        Value::Array(items) => items.push(value),
4589                        other => {
4590                            let existing = other.clone();
4591                            *other = Value::Array(vec![existing, value]);
4592                        }
4593                    }
4594                }
4595            }
4596            "increment" => {
4597                let by = update.by.unwrap_or(1.0);
4598                let current = globals
4599                    .get(key.as_str())
4600                    .and_then(Value::as_f64)
4601                    .unwrap_or(0.0);
4602                if let Some(next) = serde_json::Number::from_f64(current + by) {
4603                    globals.insert(key.clone(), Value::Number(next));
4604                }
4605            }
4606            "merge" => {
4607                if let Some(path) = update.from.as_ref() {
4608                    let source = resolve_path(&context, path.as_str())
4609                        .cloned()
4610                        .unwrap_or(Value::Null);
4611                    if let Value::Object(source_map) = source {
4612                        let target = globals
4613                            .entry(key.clone())
4614                            .or_insert_with(|| Value::Object(serde_json::Map::new()));
4615                        if let Value::Object(target_map) = target {
4616                            target_map.extend(source_map);
4617                        } else {
4618                            *target = Value::Object(source_map);
4619                        }
4620                    }
4621                }
4622            }
4623            _ => {}
4624        }
4625    }
4626}
4627
4628fn llm_output_schema_for_node(node: &YamlNode) -> Value {
4629    if let Some(schema) = node
4630        .config
4631        .as_ref()
4632        .and_then(|cfg| cfg.output_schema.clone())
4633    {
4634        return schema;
4635    }
4636
4637    default_llm_output_schema()
4638}
4639
4640fn normalize_tool_choice(
4641    config: Option<YamlToolChoiceConfig>,
4642) -> Result<Option<ToolChoice>, String> {
4643    let Some(config) = config else {
4644        return Ok(None);
4645    };
4646
4647    let choice = match config {
4648        YamlToolChoiceConfig::Mode(mode) => ToolChoice::Mode(mode),
4649        YamlToolChoiceConfig::Function { function } => ToolChoice::Tool(ToolChoiceTool {
4650            tool_type: ToolType::Function,
4651            function: ToolChoiceFunction { name: function },
4652        }),
4653        YamlToolChoiceConfig::OpenAi(tool) => ToolChoice::Tool(tool),
4654    };
4655
4656    Ok(Some(choice))
4657}
4658
4659fn normalize_llm_tools(llm: &YamlLlmCall) -> Result<Vec<YamlResolvedTool>, String> {
4660    llm.tools
4661        .iter()
4662        .map(|tool| match (llm.tools_format, tool) {
4663            (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
4664                let definition = ToolDefinition {
4665                    tool_type: openai.tool_type.unwrap_or(ToolType::Function),
4666                    function: ToolFunction {
4667                        name: openai.function.name.clone(),
4668                        description: openai.function.description.clone(),
4669                        parameters: openai.function.parameters.clone(),
4670                    },
4671                };
4672                Ok(YamlResolvedTool {
4673                    definition,
4674                    output_schema: openai.function.output_schema.clone(),
4675                })
4676            }
4677            (YamlToolFormat::Simplified, YamlToolDeclaration::Simplified(simple)) => {
4678                let definition = ToolDefinition {
4679                    tool_type: ToolType::Function,
4680                    function: ToolFunction {
4681                        name: simple.name.clone(),
4682                        description: simple.description.clone(),
4683                        parameters: Some(simple.input_schema.clone()),
4684                    },
4685                };
4686                Ok(YamlResolvedTool {
4687                    definition,
4688                    output_schema: simple.output_schema.clone(),
4689                })
4690            }
4691            (YamlToolFormat::Openai, _) => {
4692                Err("tools_format=openai requires OpenAI-style tool declarations".to_string())
4693            }
4694            (YamlToolFormat::Simplified, _) => {
4695                Err("tools_format=simplified requires simplified tool declarations".to_string())
4696            }
4697        })
4698        .collect()
4699}
4700
4701fn default_llm_output_schema() -> Value {
4702    json!({
4703        "type": "object",
4704        "additionalProperties": true
4705    })
4706}
4707
4708fn mock_rag(topic: &str) -> Value {
4709    let (kb_source, playbook) = match topic {
4710        "probation" => (
4711            "hr_policy/probation.md",
4712            "Collect manager review, performance evidence, and probation timeline.",
4713        ),
4714        "leave_request" => (
4715            "hr_policy/leave.md",
4716            "Validate leave balance, manager approval, and blackout dates.",
4717        ),
4718        "supply_chain_order_assessment" => (
4719            "supply_chain/order_assessment.md",
4720            "Review order specs, inventory risk, and vendor lead-time guidance.",
4721        ),
4722        "supply_chain_order_replacement" => (
4723            "supply_chain/order_replacement.md",
4724            "Collect order id, damage proof, and replacement SLA policy.",
4725        ),
4726        "termination_first_time_offense" => (
4727            "hr_policy/termination_first_offense.md",
4728            "Validate first-incident criteria and route to HRBP review.",
4729        ),
4730        "termination_repeated_offense" => (
4731            "hr_policy/termination_repeated_offense.md",
4732            "Collect prior warnings and escalation approvals before final action.",
4733        ),
4734        _ => (
4735            "shared/request_clarification.md",
4736            "Request clarifying details before routing.",
4737        ),
4738    };
4739
4740    json!({
4741        "kb_source": kb_source,
4742        "playbook": playbook,
4743    })
4744}
4745
4746async fn execute_subworkflow_tool_call(
4747    payload: &Value,
4748    context: &Value,
4749    client: &SimpleAgentsClient,
4750    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
4751    parent_options: &YamlWorkflowRunOptions,
4752    parent_trace_context: Option<&TraceContext>,
4753    resolved_trace_id: Option<&str>,
4754) -> Result<Value, String> {
4755    let workflow_id = payload
4756        .get("workflow_id")
4757        .and_then(Value::as_str)
4758        .ok_or_else(|| "run_workflow_graph requires payload.workflow_id".to_string())?;
4759
4760    let input_context = context
4761        .get("input")
4762        .and_then(Value::as_object)
4763        .ok_or_else(|| "run_workflow_graph requires context.input".to_string())?;
4764
4765    let registry = input_context
4766        .get("workflow_registry")
4767        .and_then(Value::as_object)
4768        .ok_or_else(|| {
4769            "run_workflow_graph requires input.workflow_registry map of workflow_id -> yaml_path"
4770                .to_string()
4771        })?;
4772
4773    let workflow_path = registry
4774        .get(workflow_id)
4775        .and_then(Value::as_str)
4776        .ok_or_else(|| {
4777            format!(
4778                "workflow_registry has no entry for workflow_id '{}'",
4779                workflow_id
4780            )
4781        })?;
4782
4783    let parent_depth = input_context
4784        .get("__subgraph_depth")
4785        .and_then(Value::as_u64)
4786        .unwrap_or(0);
4787    let max_depth = input_context
4788        .get("__subgraph_max_depth")
4789        .and_then(Value::as_u64)
4790        .unwrap_or(3);
4791
4792    if parent_depth >= max_depth {
4793        return Err(format!(
4794            "run_workflow_graph depth limit reached (depth={}, max={})",
4795            parent_depth, max_depth
4796        ));
4797    }
4798
4799    let mut subworkflow_input = payload
4800        .get("input")
4801        .and_then(Value::as_object)
4802        .cloned()
4803        .unwrap_or_default();
4804
4805    if !subworkflow_input.contains_key("messages") {
4806        if let Some(messages) = input_context.get("messages") {
4807            subworkflow_input.insert("messages".to_string(), messages.clone());
4808        }
4809    }
4810
4811    if !subworkflow_input.contains_key("email_text") {
4812        if let Some(email_text) = input_context.get("email_text") {
4813            subworkflow_input.insert("email_text".to_string(), email_text.clone());
4814        }
4815    }
4816
4817    if !subworkflow_input.contains_key("workflow_registry") {
4818        subworkflow_input.insert(
4819            "workflow_registry".to_string(),
4820            Value::Object(registry.clone()),
4821        );
4822    }
4823
4824    subworkflow_input.insert(
4825        "__subgraph_depth".to_string(),
4826        Value::Number(serde_json::Number::from(parent_depth + 1)),
4827    );
4828    subworkflow_input.insert(
4829        "__subgraph_max_depth".to_string(),
4830        Value::Number(serde_json::Number::from(max_depth)),
4831    );
4832
4833    let subworkflow_options =
4834        build_subworkflow_options(parent_options, parent_trace_context, resolved_trace_id);
4835
4836    let output = run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
4837        Path::new(workflow_path),
4838        &Value::Object(subworkflow_input),
4839        client,
4840        custom_worker,
4841        None,
4842        &subworkflow_options,
4843    )
4844    .await
4845    .map_err(|error| format!("subworkflow '{}' failed: {}", workflow_id, error))?;
4846
4847    Ok(json!({
4848        "workflow_id": workflow_id,
4849        "workflow_path": workflow_path,
4850        "terminal_node": output.terminal_node,
4851        "terminal_output": output.terminal_output,
4852        "trace": output.trace,
4853    }))
4854}
4855
4856fn build_subworkflow_options(
4857    parent_options: &YamlWorkflowRunOptions,
4858    parent_trace_context: Option<&TraceContext>,
4859    resolved_trace_id: Option<&str>,
4860) -> YamlWorkflowRunOptions {
4861    let mut subworkflow_options = parent_options.clone();
4862    if parent_trace_context.is_some() || resolved_trace_id.is_some() {
4863        let trace_context = YamlWorkflowTraceContextInput {
4864            trace_id: resolved_trace_id
4865                .map(|value| value.to_string())
4866                .or_else(|| parent_trace_context.and_then(|ctx| ctx.trace_id.clone())),
4867            span_id: parent_trace_context.and_then(|ctx| ctx.span_id.clone()),
4868            parent_span_id: parent_trace_context.and_then(|ctx| ctx.parent_span_id.clone()),
4869            traceparent: parent_trace_context.and_then(|ctx| ctx.traceparent.clone()),
4870            tracestate: parent_trace_context.and_then(|ctx| ctx.tracestate.clone()),
4871            baggage: parent_trace_context
4872                .map(|ctx| ctx.baggage.clone())
4873                .unwrap_or_default(),
4874        };
4875        subworkflow_options.trace.context = Some(trace_context);
4876    }
4877    subworkflow_options
4878}
4879
4880fn mock_custom_worker_output(
4881    handler: &str,
4882    payload: &Value,
4883) -> Result<Value, YamlWorkflowRunError> {
4884    if handler == "get_employee_record" {
4885        let employee_name = payload
4886            .get("employee_name")
4887            .and_then(Value::as_str)
4888            .unwrap_or("Unknown Employee")
4889            .trim();
4890        let normalized_name = if employee_name.is_empty() {
4891            "Unknown Employee"
4892        } else {
4893            employee_name
4894        };
4895
4896        let (employee_id, location) = match normalized_name.to_ascii_lowercase().as_str() {
4897            "alex johnson" => ("EMP-2041", "Austin"),
4898            "priya sharma" => ("EMP-3378", "Bengaluru"),
4899            "marcus lee" => ("EMP-1196", "Singapore"),
4900            "sarah chen" => ("EMP-4450", "Toronto"),
4901            _ => ("EMP-0000", "Unassigned"),
4902        };
4903
4904        return Ok(json!({
4905            "employee_name": normalized_name,
4906            "employee_id": employee_id,
4907            "location": location,
4908        }));
4909    }
4910
4911    if let Some(topic) = payload.get("topic").and_then(Value::as_str) {
4912        let mut value = mock_rag(topic);
4913        if let Value::Object(object) = &mut value {
4914            object.insert("handler".to_string(), Value::String(handler.to_string()));
4915        }
4916        return Ok(value);
4917    }
4918
4919    Err(YamlWorkflowRunError::UnsupportedCustomHandler {
4920        handler: handler.to_string(),
4921    })
4922}
4923
4924#[derive(Debug, Clone, Deserialize)]
4925pub struct YamlWorkflow {
4926    pub id: String,
4927    pub entry_node: String,
4928    #[serde(default)]
4929    pub nodes: Vec<YamlNode>,
4930    #[serde(default)]
4931    pub edges: Vec<YamlEdge>,
4932}
4933
4934#[derive(Debug, Clone, Deserialize)]
4935pub struct YamlNode {
4936    pub id: String,
4937    pub node_type: YamlNodeType,
4938    pub config: Option<YamlNodeConfig>,
4939}
4940
4941impl YamlNode {
4942    fn kind_name(&self) -> &'static str {
4943        if self.node_type.llm_call.is_some() {
4944            "llm_call"
4945        } else if self.node_type.switch.is_some() {
4946            "switch"
4947        } else if self.node_type.custom_worker.is_some() {
4948            "custom_worker"
4949        } else {
4950            "unknown"
4951        }
4952    }
4953}
4954
4955#[derive(Debug, Clone, Deserialize)]
4956pub struct YamlNodeType {
4957    pub llm_call: Option<YamlLlmCall>,
4958    pub switch: Option<YamlSwitch>,
4959    pub custom_worker: Option<YamlCustomWorker>,
4960}
4961
4962#[derive(Debug, Clone, Deserialize)]
4963pub struct YamlLlmCall {
4964    pub model: String,
4965    pub stream: Option<bool>,
4966    pub stream_json_as_text: Option<bool>,
4967    pub heal: Option<bool>,
4968    pub messages_path: Option<String>,
4969    pub append_prompt_as_user: Option<bool>,
4970    #[serde(default)]
4971    pub tools_format: YamlToolFormat,
4972    #[serde(default)]
4973    pub tools: Vec<YamlToolDeclaration>,
4974    pub tool_choice: Option<YamlToolChoiceConfig>,
4975    pub max_tool_roundtrips: Option<u8>,
4976    pub tool_calls_global_key: Option<String>,
4977}
4978
4979#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
4980#[serde(rename_all = "snake_case")]
4981pub enum YamlToolFormat {
4982    #[default]
4983    Openai,
4984    Simplified,
4985}
4986
4987#[derive(Debug, Clone, Deserialize)]
4988#[serde(untagged)]
4989pub enum YamlToolDeclaration {
4990    OpenAi(YamlOpenAiToolDeclaration),
4991    Simplified(YamlSimplifiedToolDeclaration),
4992}
4993
4994#[derive(Debug, Clone, Deserialize)]
4995pub struct YamlOpenAiToolDeclaration {
4996    #[serde(rename = "type")]
4997    pub tool_type: Option<ToolType>,
4998    pub function: YamlOpenAiToolFunction,
4999}
5000
5001#[derive(Debug, Clone, Deserialize)]
5002pub struct YamlOpenAiToolFunction {
5003    pub name: String,
5004    pub description: Option<String>,
5005    pub parameters: Option<Value>,
5006    pub output_schema: Option<Value>,
5007}
5008
5009#[derive(Debug, Clone, Deserialize)]
5010pub struct YamlSimplifiedToolDeclaration {
5011    pub name: String,
5012    pub description: Option<String>,
5013    pub input_schema: Value,
5014    pub output_schema: Option<Value>,
5015}
5016
5017#[derive(Debug, Clone, Deserialize)]
5018#[serde(untagged)]
5019pub enum YamlToolChoiceConfig {
5020    Mode(ToolChoiceMode),
5021    Function { function: String },
5022    OpenAi(ToolChoiceTool),
5023}
5024
5025#[derive(Debug, Clone, Deserialize)]
5026pub struct YamlSwitch {
5027    #[serde(default)]
5028    pub branches: Vec<YamlSwitchBranch>,
5029    pub default: String,
5030}
5031
5032#[derive(Debug, Clone, Deserialize)]
5033pub struct YamlSwitchBranch {
5034    pub condition: String,
5035    pub target: String,
5036}
5037
5038#[derive(Debug, Clone, Deserialize)]
5039pub struct YamlCustomWorker {
5040    pub handler: String,
5041}
5042
5043#[derive(Debug, Clone, Deserialize)]
5044pub struct YamlNodeConfig {
5045    pub prompt: Option<String>,
5046    #[serde(default, alias = "schema")]
5047    pub output_schema: Option<Value>,
5048    pub payload: Option<Value>,
5049    pub set_globals: Option<HashMap<String, String>>,
5050    pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
5051}
5052
5053#[derive(Debug, Clone, Deserialize)]
5054pub struct YamlGlobalUpdate {
5055    pub op: String,
5056    pub from: Option<String>,
5057    pub by: Option<f64>,
5058}
5059
5060#[derive(Debug, Clone, Deserialize)]
5061pub struct YamlEdge {
5062    pub from: String,
5063    pub to: String,
5064}
5065
5066#[cfg(test)]
5067mod tests {
5068    use super::*;
5069    use simple_agent_type::provider::{Provider, ProviderRequest, ProviderResponse};
5070    use simple_agent_type::response::{CompletionChoice, CompletionResponse, Usage};
5071    use simple_agent_type::tool::{ToolCallFunction, ToolType};
5072    use simple_agent_type::{Result as SaResult, SimpleAgentsError};
5073    use simple_agents_core::SimpleAgentsClientBuilder;
5074    use std::fs;
5075    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5076    use std::sync::{Arc, Mutex, OnceLock};
5077
5078    fn stream_debug_env_lock() -> &'static Mutex<()> {
5079        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
5080        LOCK.get_or_init(|| Mutex::new(()))
5081    }
5082
5083    struct MockExecutor;
5084
5085    struct RecordingSink {
5086        events: Mutex<Vec<YamlWorkflowEvent>>,
5087    }
5088
5089    struct CancelAfterFirstEventSink {
5090        cancelled: AtomicBool,
5091    }
5092
5093    impl YamlWorkflowEventSink for CancelAfterFirstEventSink {
5094        fn emit(&self, _event: &YamlWorkflowEvent) {
5095            self.cancelled.store(true, Ordering::SeqCst);
5096        }
5097
5098        fn is_cancelled(&self) -> bool {
5099            self.cancelled.load(Ordering::SeqCst)
5100        }
5101    }
5102
5103    struct CountingExecutor {
5104        call_count: AtomicUsize,
5105    }
5106
5107    struct CapturingWorker {
5108        context: Mutex<Option<Value>>,
5109    }
5110
5111    struct ToolLoopProvider;
5112
5113    struct UnknownToolProvider;
5114
5115    #[async_trait]
5116    impl Provider for ToolLoopProvider {
5117        fn name(&self) -> &str {
5118            "openai"
5119        }
5120
5121        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5122            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5123            Ok(ProviderRequest::new("mock://tool-loop").with_body(body))
5124        }
5125
5126        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5127            let request: CompletionRequest =
5128                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5129
5130            let has_tools = request
5131                .tools
5132                .as_ref()
5133                .is_some_and(|tools| !tools.is_empty());
5134            let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
5135
5136            let response = if has_tools && !has_tool_result {
5137                CompletionResponse {
5138                    id: "resp_tool_1".to_string(),
5139                    model: request.model.clone(),
5140                    choices: vec![CompletionChoice {
5141                        index: 0,
5142                        message: Message::assistant("").with_tool_calls(vec![ToolCall {
5143                            id: "call_get_context".to_string(),
5144                            tool_type: ToolType::Function,
5145                            function: ToolCallFunction {
5146                                name: "get_customer_context".to_string(),
5147                                arguments: "{\"order_id\":\"123\"}".to_string(),
5148                            },
5149                        }]),
5150                        finish_reason: FinishReason::ToolCalls,
5151                        logprobs: None,
5152                    }],
5153                    usage: Usage::new(10, 5),
5154                    created: None,
5155                    provider: Some(self.name().to_string()),
5156                    healing_metadata: None,
5157                }
5158            } else if has_tools && has_tool_result {
5159                CompletionResponse {
5160                    id: "resp_tool_2".to_string(),
5161                    model: request.model.clone(),
5162                    choices: vec![CompletionChoice {
5163                        index: 0,
5164                        message: Message::assistant("{\"state\":\"done\"}"),
5165                        finish_reason: FinishReason::Stop,
5166                        logprobs: None,
5167                    }],
5168                    usage: Usage::new(12, 6),
5169                    created: None,
5170                    provider: Some(self.name().to_string()),
5171                    healing_metadata: None,
5172                }
5173            } else {
5174                let prompt = request
5175                    .messages
5176                    .iter()
5177                    .rev()
5178                    .find(|m| m.role == Role::User)
5179                    .map(|m| m.content.clone())
5180                    .unwrap_or_default();
5181                let payload = json!({
5182                    "subject": "ok",
5183                    "body": prompt,
5184                })
5185                .to_string();
5186                CompletionResponse {
5187                    id: "resp_final".to_string(),
5188                    model: request.model.clone(),
5189                    choices: vec![CompletionChoice {
5190                        index: 0,
5191                        message: Message::assistant(payload),
5192                        finish_reason: FinishReason::Stop,
5193                        logprobs: None,
5194                    }],
5195                    usage: Usage::new(8, 4),
5196                    created: None,
5197                    provider: Some(self.name().to_string()),
5198                    healing_metadata: None,
5199                }
5200            };
5201
5202            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5203            Ok(ProviderResponse::new(200, body))
5204        }
5205
5206        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5207            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5208        }
5209    }
5210
5211    #[async_trait]
5212    impl Provider for UnknownToolProvider {
5213        fn name(&self) -> &str {
5214            "openai"
5215        }
5216
5217        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5218            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5219            Ok(ProviderRequest::new("mock://unknown-tool").with_body(body))
5220        }
5221
5222        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5223            let request: CompletionRequest =
5224                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5225
5226            let response = CompletionResponse {
5227                id: "resp_unknown_tool".to_string(),
5228                model: request.model,
5229                choices: vec![CompletionChoice {
5230                    index: 0,
5231                    message: Message::assistant("").with_tool_calls(vec![ToolCall {
5232                        id: "call_unknown".to_string(),
5233                        tool_type: ToolType::Function,
5234                        function: ToolCallFunction {
5235                            name: "unknown_tool".to_string(),
5236                            arguments: "{}".to_string(),
5237                        },
5238                    }]),
5239                    finish_reason: FinishReason::ToolCalls,
5240                    logprobs: None,
5241                }],
5242                usage: Usage::new(5, 2),
5243                created: None,
5244                provider: Some(self.name().to_string()),
5245                healing_metadata: None,
5246            };
5247
5248            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5249            Ok(ProviderResponse::new(200, body))
5250        }
5251
5252        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5253            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5254        }
5255    }
5256
5257    struct FixedToolWorker {
5258        payload: Value,
5259    }
5260
5261    struct CountingToolWorker {
5262        execute_calls: AtomicUsize,
5263    }
5264
5265    #[async_trait]
5266    impl YamlWorkflowCustomWorkerExecutor for FixedToolWorker {
5267        async fn execute(
5268            &self,
5269            _handler: &str,
5270            _payload: &Value,
5271            _email_text: &str,
5272            _context: &Value,
5273        ) -> Result<Value, String> {
5274            Ok(self.payload.clone())
5275        }
5276    }
5277
5278    #[async_trait]
5279    impl YamlWorkflowCustomWorkerExecutor for CountingToolWorker {
5280        async fn execute(
5281            &self,
5282            _handler: &str,
5283            _payload: &Value,
5284            _email_text: &str,
5285            _context: &Value,
5286        ) -> Result<Value, String> {
5287            self.execute_calls.fetch_add(1, Ordering::SeqCst);
5288            Ok(json!({"ok": true}))
5289        }
5290    }
5291
5292    #[async_trait]
5293    impl YamlWorkflowLlmExecutor for CountingExecutor {
5294        async fn complete_structured(
5295            &self,
5296            _request: YamlLlmExecutionRequest,
5297            _event_sink: Option<&dyn YamlWorkflowEventSink>,
5298        ) -> Result<YamlLlmExecutionResult, String> {
5299            self.call_count.fetch_add(1, Ordering::SeqCst);
5300            Ok(YamlLlmExecutionResult {
5301                payload: json!({"state":"ok"}),
5302                usage: None,
5303                ttft_ms: None,
5304                tool_calls: Vec::new(),
5305            })
5306        }
5307    }
5308
5309    impl YamlWorkflowEventSink for RecordingSink {
5310        fn emit(&self, event: &YamlWorkflowEvent) {
5311            self.events
5312                .lock()
5313                .expect("recording sink lock should not be poisoned")
5314                .push(event.clone());
5315        }
5316    }
5317
5318    #[async_trait]
5319    impl YamlWorkflowCustomWorkerExecutor for CapturingWorker {
5320        async fn execute(
5321            &self,
5322            _handler: &str,
5323            _payload: &Value,
5324            _email_text: &str,
5325            context: &Value,
5326        ) -> Result<Value, String> {
5327            let mut guard = self
5328                .context
5329                .lock()
5330                .map_err(|_| "capturing worker lock should not be poisoned".to_string())?;
5331            *guard = Some(context.clone());
5332            Ok(json!({"ok": true}))
5333        }
5334    }
5335
5336    #[async_trait]
5337    impl YamlWorkflowLlmExecutor for MockExecutor {
5338        async fn complete_structured(
5339            &self,
5340            request: YamlLlmExecutionRequest,
5341            _event_sink: Option<&dyn YamlWorkflowEventSink>,
5342        ) -> Result<YamlLlmExecutionResult, String> {
5343            let prompt = request.prompt;
5344            if prompt.contains("exactly one category") {
5345                return Ok(YamlLlmExecutionResult {
5346                    payload: json!({"category":"termination","reason":"mock"}),
5347                    usage: Some(YamlLlmTokenUsage {
5348                        prompt_tokens: 10,
5349                        completion_tokens: 5,
5350                        total_tokens: 15,
5351                        thinking_tokens: None,
5352                    }),
5353                    ttft_ms: None,
5354                    tool_calls: Vec::new(),
5355                });
5356            }
5357            if prompt.contains("Determine termination subtype") {
5358                return Ok(YamlLlmExecutionResult {
5359                    payload: json!({"subtype":"repeated_offense","reason":"mock"}),
5360                    usage: Some(YamlLlmTokenUsage {
5361                        prompt_tokens: 12,
5362                        completion_tokens: 6,
5363                        total_tokens: 18,
5364                        thinking_tokens: None,
5365                    }),
5366                    ttft_ms: None,
5367                    tool_calls: Vec::new(),
5368                });
5369            }
5370            if prompt.contains("Determine supply chain subtype") {
5371                return Ok(YamlLlmExecutionResult {
5372                    payload: json!({"subtype":"order_replacement","reason":"mock"}),
5373                    usage: Some(YamlLlmTokenUsage {
5374                        prompt_tokens: 11,
5375                        completion_tokens: 4,
5376                        total_tokens: 15,
5377                        thinking_tokens: None,
5378                    }),
5379                    ttft_ms: None,
5380                    tool_calls: Vec::new(),
5381                });
5382            }
5383            Err("unexpected prompt".to_string())
5384        }
5385    }
5386
5387    #[tokio::test]
5388    async fn runs_yaml_workflow_and_returns_step_timings() {
5389        let yaml = r#"
5390id: email-intake-classification
5391entry_node: classify_top_level
5392nodes:
5393  - id: classify_top_level
5394    node_type:
5395      llm_call:
5396        model: gpt-4.1
5397    config:
5398      prompt: |
5399        Classify this email into exactly one category:
5400        {{ input.email_text }}
5401  - id: route_top_level
5402    node_type:
5403      switch:
5404        branches:
5405          - condition: '$.nodes.classify_top_level.output.category == "termination"'
5406            target: classify_termination_subtype
5407        default: rag_clarification
5408  - id: classify_termination_subtype
5409    node_type:
5410      llm_call:
5411        model: gpt-4.1
5412    config:
5413      prompt: |
5414        Determine termination subtype:
5415        {{ input.email_text }}
5416  - id: route_termination_subtype
5417    node_type:
5418      switch:
5419        branches:
5420          - condition: '$.nodes.classify_termination_subtype.output.subtype == "repeated_offense"'
5421            target: rag_termination_repeated_offense
5422        default: rag_clarification
5423  - id: rag_termination_repeated_offense
5424    node_type:
5425      custom_worker:
5426        handler: GetRagData
5427    config:
5428      payload:
5429        topic: termination_repeated_offense
5430  - id: rag_clarification
5431    node_type:
5432      custom_worker:
5433        handler: GetRagData
5434    config:
5435      payload:
5436        topic: clarification
5437edges:
5438  - from: classify_top_level
5439    to: route_top_level
5440  - from: classify_termination_subtype
5441    to: route_termination_subtype
5442"#;
5443
5444        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5445        let output = run_email_workflow_yaml(&workflow, "test", &MockExecutor)
5446            .await
5447            .expect("yaml workflow should execute");
5448
5449        assert_eq!(output.workflow_id, "email-intake-classification");
5450        assert_eq!(output.terminal_node, "rag_termination_repeated_offense");
5451        assert!(!output.step_timings.is_empty());
5452        assert_eq!(output.step_timings.len(), output.trace.len());
5453        assert!(output
5454            .outputs
5455            .contains_key("rag_termination_repeated_offense"));
5456        assert_eq!(output.total_input_tokens, 22);
5457        assert_eq!(output.total_output_tokens, 11);
5458        assert_eq!(output.total_tokens, 33);
5459    }
5460
5461    #[tokio::test]
5462    async fn emits_resolved_llm_input_event_with_bindings() {
5463        let yaml = r#"
5464id: email-intake-classification
5465entry_node: classify_top_level
5466nodes:
5467  - id: classify_top_level
5468    node_type:
5469      llm_call:
5470        model: gpt-4.1
5471    config:
5472      prompt: |
5473        Classify this email into exactly one category:
5474        {{ input.email_text }}
5475"#;
5476
5477        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5478        let sink = RecordingSink {
5479            events: Mutex::new(Vec::new()),
5480        };
5481
5482        let output = run_email_workflow_yaml_with_custom_worker_and_events(
5483            &workflow,
5484            "Need help with termination",
5485            &MockExecutor,
5486            None,
5487            Some(&sink),
5488        )
5489        .await
5490        .expect("yaml workflow should execute");
5491
5492        assert_eq!(output.terminal_node, "classify_top_level");
5493
5494        let events = sink
5495            .events
5496            .lock()
5497            .expect("recording sink lock should not be poisoned");
5498        let llm_event = events
5499            .iter()
5500            .find(|event| event.event_type == "node_llm_input_resolved")
5501            .expect("expected llm input telemetry event");
5502
5503        let metadata = llm_event
5504            .metadata
5505            .as_ref()
5506            .expect("llm input event must include metadata");
5507        assert_eq!(metadata["model"], Value::String("gpt-4.1".to_string()));
5508        assert_eq!(metadata["stream_requested"], Value::Bool(false));
5509        assert_eq!(metadata["heal_requested"], Value::Bool(false));
5510        assert!(metadata["prompt"]
5511            .as_str()
5512            .expect("prompt should be a string")
5513            .contains("Need help with termination"));
5514
5515        let bindings = metadata["bindings"]
5516            .as_array()
5517            .expect("bindings should be an array");
5518        assert_eq!(bindings.len(), 1);
5519        assert_eq!(
5520            bindings[0]["source_path"],
5521            Value::String("input.email_text".to_string())
5522        );
5523        assert_eq!(
5524            bindings[0]["resolved"],
5525            Value::String("Need help with termination".to_string())
5526        );
5527        assert_eq!(bindings[0]["missing"], Value::Bool(false));
5528        assert_eq!(
5529            bindings[0]["resolved_type"],
5530            Value::String("string".to_string())
5531        );
5532    }
5533
5534    #[tokio::test]
5535    async fn workflow_completed_event_includes_nerdstats_by_default() {
5536        let yaml = r#"
5537id: nerdstats-default
5538entry_node: classify
5539nodes:
5540  - id: classify
5541    node_type:
5542      llm_call:
5543        model: gpt-4.1
5544    config:
5545      prompt: |
5546        Classify this email into exactly one category:
5547        {{ input.email_text }}
5548"#;
5549
5550        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5551        let sink = RecordingSink {
5552            events: Mutex::new(Vec::new()),
5553        };
5554
5555        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5556            &workflow,
5557            &json!({"email_text":"hello"}),
5558            &MockExecutor,
5559            None,
5560            Some(&sink),
5561            &YamlWorkflowRunOptions::default(),
5562        )
5563        .await
5564        .expect("workflow should execute");
5565
5566        let events = sink
5567            .events
5568            .lock()
5569            .expect("recording sink lock should not be poisoned");
5570        let completed = events
5571            .iter()
5572            .find(|event| event.event_type == "workflow_completed")
5573            .expect("expected workflow_completed event");
5574        let metadata = completed
5575            .metadata
5576            .as_ref()
5577            .expect("workflow_completed should include metadata by default");
5578        let nerdstats = metadata
5579            .get("nerdstats")
5580            .expect("nerdstats should be present by default");
5581
5582        assert_eq!(nerdstats["workflow_id"], Value::String(output.workflow_id));
5583        assert_eq!(
5584            nerdstats["terminal_node"],
5585            Value::String(output.terminal_node)
5586        );
5587        assert_eq!(
5588            nerdstats["total_tokens"],
5589            Value::Number(output.total_tokens.into())
5590        );
5591        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
5592        assert_eq!(
5593            nerdstats["token_metrics_source"],
5594            Value::String("provider_usage".to_string())
5595        );
5596        assert_eq!(nerdstats["ttft_ms"], Value::Null);
5597    }
5598
5599    #[tokio::test]
5600    async fn workflow_completed_event_omits_nerdstats_when_disabled() {
5601        let yaml = r#"
5602id: nerdstats-disabled
5603entry_node: classify
5604nodes:
5605  - id: classify
5606    node_type:
5607      llm_call:
5608        model: gpt-4.1
5609    config:
5610      prompt: |
5611        Classify this email into exactly one category:
5612        {{ input.email_text }}
5613"#;
5614
5615        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5616        let sink = RecordingSink {
5617            events: Mutex::new(Vec::new()),
5618        };
5619        let options = YamlWorkflowRunOptions {
5620            telemetry: YamlWorkflowTelemetryConfig {
5621                nerdstats: false,
5622                ..YamlWorkflowTelemetryConfig::default()
5623            },
5624            ..YamlWorkflowRunOptions::default()
5625        };
5626
5627        run_workflow_yaml_with_custom_worker_and_events_and_options(
5628            &workflow,
5629            &json!({"email_text":"hello"}),
5630            &MockExecutor,
5631            None,
5632            Some(&sink),
5633            &options,
5634        )
5635        .await
5636        .expect("workflow should execute");
5637
5638        let events = sink
5639            .events
5640            .lock()
5641            .expect("recording sink lock should not be poisoned");
5642        let completed = events
5643            .iter()
5644            .find(|event| event.event_type == "workflow_completed")
5645            .expect("expected workflow_completed event");
5646        assert!(completed.metadata.is_none());
5647    }
5648
5649    struct StreamAwareMockExecutor;
5650
5651    #[async_trait]
5652    impl YamlWorkflowLlmExecutor for StreamAwareMockExecutor {
5653        async fn complete_structured(
5654            &self,
5655            request: YamlLlmExecutionRequest,
5656            _event_sink: Option<&dyn YamlWorkflowEventSink>,
5657        ) -> Result<YamlLlmExecutionResult, String> {
5658            Ok(YamlLlmExecutionResult {
5659                payload: json!({"state":"ok"}),
5660                usage: Some(YamlLlmTokenUsage {
5661                    prompt_tokens: 20,
5662                    completion_tokens: 10,
5663                    total_tokens: 30,
5664                    thinking_tokens: None,
5665                }),
5666                ttft_ms: if request.stream { Some(12) } else { None },
5667                tool_calls: Vec::new(),
5668            })
5669        }
5670    }
5671
5672    #[tokio::test]
5673    async fn workflow_completed_event_includes_nerdstats_for_streaming_nodes() {
5674        let yaml = r#"
5675id: nerdstats-streaming
5676entry_node: classify
5677nodes:
5678  - id: classify
5679    node_type:
5680      llm_call:
5681        model: gpt-4.1
5682        stream: true
5683    config:
5684      prompt: |
5685        Return JSON only:
5686        {"state":"ok"}
5687"#;
5688
5689        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5690        let sink = RecordingSink {
5691            events: Mutex::new(Vec::new()),
5692        };
5693
5694        run_workflow_yaml_with_custom_worker_and_events_and_options(
5695            &workflow,
5696            &json!({"email_text":"hello"}),
5697            &StreamAwareMockExecutor,
5698            None,
5699            Some(&sink),
5700            &YamlWorkflowRunOptions::default(),
5701        )
5702        .await
5703        .expect("workflow should execute");
5704
5705        let events = sink
5706            .events
5707            .lock()
5708            .expect("recording sink lock should not be poisoned");
5709        let completed = events
5710            .iter()
5711            .find(|event| event.event_type == "workflow_completed")
5712            .expect("expected workflow_completed event");
5713        let metadata = completed
5714            .metadata
5715            .as_ref()
5716            .expect("workflow_completed should include metadata by default");
5717        let nerdstats = metadata
5718            .get("nerdstats")
5719            .expect("nerdstats should be present by default");
5720
5721        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
5722        assert_eq!(nerdstats["total_tokens"], Value::Number(30u64.into()));
5723        assert_eq!(nerdstats["ttft_ms"], Value::Number(12u64.into()));
5724    }
5725
5726    #[test]
5727    fn workflow_nerdstats_marks_stream_token_metrics_unavailable() {
5728        let output = YamlWorkflowRunOutput {
5729            workflow_id: "workflow".to_string(),
5730            entry_node: "start".to_string(),
5731            email_text: "hello".to_string(),
5732            trace: vec!["llm_node".to_string()],
5733            outputs: BTreeMap::new(),
5734            terminal_node: "llm_node".to_string(),
5735            terminal_output: None,
5736            step_timings: vec![YamlStepTiming {
5737                node_id: "llm_node".to_string(),
5738                node_kind: "llm_call".to_string(),
5739                elapsed_ms: 100,
5740                prompt_tokens: None,
5741                completion_tokens: None,
5742                total_tokens: None,
5743                thinking_tokens: None,
5744                tokens_per_second: None,
5745            }],
5746            llm_node_metrics: BTreeMap::new(),
5747            total_elapsed_ms: 100,
5748            ttft_ms: None,
5749            total_input_tokens: 0,
5750            total_output_tokens: 0,
5751            total_tokens: 0,
5752            total_thinking_tokens: None,
5753            tokens_per_second: 0.0,
5754            trace_id: Some("trace-1".to_string()),
5755            metadata: None,
5756        };
5757
5758        let nerdstats = workflow_nerdstats(&output);
5759        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(false));
5760        assert_eq!(
5761            nerdstats["token_metrics_source"],
5762            Value::String("provider_stream_usage_unavailable".to_string())
5763        );
5764        assert_eq!(nerdstats["total_tokens"], Value::Null);
5765        assert_eq!(nerdstats["ttft_ms"], Value::Null);
5766    }
5767
5768    #[test]
5769    fn workflow_nerdstats_includes_ttft_when_available() {
5770        let output = YamlWorkflowRunOutput {
5771            workflow_id: "workflow".to_string(),
5772            entry_node: "start".to_string(),
5773            email_text: "hello".to_string(),
5774            trace: vec!["llm_node".to_string()],
5775            outputs: BTreeMap::new(),
5776            terminal_node: "llm_node".to_string(),
5777            terminal_output: None,
5778            step_timings: vec![YamlStepTiming {
5779                node_id: "llm_node".to_string(),
5780                node_kind: "llm_call".to_string(),
5781                elapsed_ms: 100,
5782                prompt_tokens: Some(10),
5783                completion_tokens: Some(15),
5784                total_tokens: Some(25),
5785                thinking_tokens: None,
5786                tokens_per_second: Some(150.0),
5787            }],
5788            llm_node_metrics: BTreeMap::new(),
5789            total_elapsed_ms: 100,
5790            ttft_ms: Some(42),
5791            total_input_tokens: 10,
5792            total_output_tokens: 15,
5793            total_tokens: 25,
5794            total_thinking_tokens: None,
5795            tokens_per_second: 150.0,
5796            trace_id: Some("trace-2".to_string()),
5797            metadata: None,
5798        };
5799
5800        let nerdstats = workflow_nerdstats(&output);
5801        assert_eq!(nerdstats["ttft_ms"], Value::Number(42u64.into()));
5802    }
5803
5804    struct MessageHistoryExecutor;
5805
5806    #[async_trait]
5807    impl YamlWorkflowLlmExecutor for MessageHistoryExecutor {
5808        async fn complete_structured(
5809            &self,
5810            request: YamlLlmExecutionRequest,
5811            _event_sink: Option<&dyn YamlWorkflowEventSink>,
5812        ) -> Result<YamlLlmExecutionResult, String> {
5813            let messages = request
5814                .messages
5815                .ok_or_else(|| "expected messages in request".to_string())?;
5816            if messages.len() != 2 {
5817                return Err(format!("expected 2 messages, got {}", messages.len()));
5818            }
5819            Ok(YamlLlmExecutionResult {
5820                payload: json!({"category":"termination","reason":"history"}),
5821                usage: Some(YamlLlmTokenUsage {
5822                    prompt_tokens: 7,
5823                    completion_tokens: 3,
5824                    total_tokens: 10,
5825                    thinking_tokens: None,
5826                }),
5827                ttft_ms: None,
5828                tool_calls: Vec::new(),
5829            })
5830        }
5831    }
5832
5833    #[tokio::test]
5834    async fn supports_messages_path_in_workflow_input() {
5835        let yaml = r#"
5836id: email-intake-classification
5837entry_node: classify_top_level
5838nodes:
5839  - id: classify_top_level
5840    node_type:
5841      llm_call:
5842        model: gpt-4.1
5843        messages_path: input.messages
5844        append_prompt_as_user: false
5845"#;
5846
5847        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5848        let input = json!({
5849            "email_text": "ignored",
5850            "messages": [
5851                {"role": "system", "content": "You are a classifier"},
5852                {"role": "user", "content": "Please classify this"}
5853            ]
5854        });
5855
5856        let output = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
5857            .await
5858            .expect("workflow should use chat history from input");
5859
5860        assert_eq!(output.terminal_node, "classify_top_level");
5861        assert_eq!(
5862            output.outputs["classify_top_level"]["output"]["reason"],
5863            Value::String("history".to_string())
5864        );
5865    }
5866
5867    #[tokio::test]
5868    async fn wrapper_entrypoints_produce_equivalent_outputs() {
5869        let yaml = r#"
5870id: wrapper-equivalence
5871entry_node: classify
5872nodes:
5873  - id: classify
5874    node_type:
5875      llm_call:
5876        model: gpt-4.1
5877    config:
5878      prompt: |
5879        Classify this email into exactly one category:
5880        {{ input.email_text }}
5881"#;
5882
5883        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5884        let input = json!({"email_text":"hello"});
5885
5886        let a = run_workflow_yaml(&workflow, &input, &MockExecutor)
5887            .await
5888            .expect("base entrypoint should execute");
5889        let b = run_workflow_yaml_with_custom_worker(&workflow, &input, &MockExecutor, None)
5890            .await
5891            .expect("custom worker wrapper should execute");
5892        let c = run_workflow_yaml_with_custom_worker_and_events_and_options(
5893            &workflow,
5894            &input,
5895            &MockExecutor,
5896            None,
5897            None,
5898            &YamlWorkflowRunOptions::default(),
5899        )
5900        .await
5901        .expect("events/options wrapper should execute");
5902
5903        assert_eq!(a.workflow_id, b.workflow_id);
5904        assert_eq!(a.workflow_id, c.workflow_id);
5905        assert_eq!(a.terminal_node, b.terminal_node);
5906        assert_eq!(a.terminal_node, c.terminal_node);
5907        assert_eq!(a.outputs, b.outputs);
5908        assert_eq!(a.outputs, c.outputs);
5909        assert_eq!(a.total_tokens, b.total_tokens);
5910        assert_eq!(a.total_tokens, c.total_tokens);
5911    }
5912
5913    #[tokio::test]
5914    async fn yaml_llm_tool_calling_captures_traces_and_supports_globals_reference() {
5915        let yaml = r#"
5916id: tool-calling-workflow
5917entry_node: generate_with_tool
5918nodes:
5919  - id: generate_with_tool
5920    node_type:
5921      llm_call:
5922        model: gpt-4.1
5923        tools_format: simplified
5924        max_tool_roundtrips: 1
5925        tool_calls_global_key: audit
5926        tools:
5927          - name: get_customer_context
5928            description: Fetch customer context
5929            input_schema:
5930              type: object
5931              properties:
5932                order_id: { type: string }
5933              required: [order_id]
5934              additionalProperties: false
5935            output_schema:
5936              type: object
5937              properties:
5938                customer_name: { type: string }
5939              required: [customer_name]
5940              additionalProperties: false
5941    config:
5942      output_schema:
5943        type: object
5944        properties:
5945          state: { type: string }
5946        required: [state]
5947  - id: personalize
5948    node_type:
5949      llm_call:
5950        model: gpt-4.1
5951    config:
5952      prompt: |
5953        Write an email greeting for {{ globals.audit.0.output.customer_name }}.
5954      output_schema:
5955        type: object
5956        properties:
5957          subject: { type: string }
5958          body: { type: string }
5959        required: [subject, body]
5960edges:
5961  - from: generate_with_tool
5962    to: personalize
5963"#;
5964
5965        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5966        let client = SimpleAgentsClientBuilder::new()
5967            .with_provider(Arc::new(ToolLoopProvider))
5968            .build()
5969            .expect("client should build");
5970        let worker = FixedToolWorker {
5971            payload: json!({"customer_name": "Ava"}),
5972        };
5973
5974        let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
5975            &workflow,
5976            &json!({"email_text":"hello"}),
5977            &client,
5978            Some(&worker),
5979            None,
5980            &YamlWorkflowRunOptions::default(),
5981        )
5982        .await
5983        .expect("workflow should execute");
5984
5985        assert_eq!(output.trace, vec!["generate_with_tool", "personalize"]);
5986        assert_eq!(
5987            output.outputs["generate_with_tool"]["tool_calls"][0]["output"]["customer_name"],
5988            Value::String("Ava".to_string())
5989        );
5990        let body = output.outputs["personalize"]["output"]["body"]
5991            .as_str()
5992            .expect("body should be string");
5993        assert!(body.contains("Ava"));
5994    }
5995
5996    #[tokio::test]
5997    async fn yaml_llm_tool_output_schema_mismatch_hard_fails_node() {
5998        let yaml = r#"
5999id: tool-calling-schema-fail
6000entry_node: generate_with_tool
6001nodes:
6002  - id: generate_with_tool
6003    node_type:
6004      llm_call:
6005        model: gpt-4.1
6006        tools_format: simplified
6007        max_tool_roundtrips: 1
6008        tools:
6009          - name: get_customer_context
6010            input_schema:
6011              type: object
6012              properties:
6013                order_id: { type: string }
6014              required: [order_id]
6015              additionalProperties: false
6016            output_schema:
6017              type: object
6018              properties:
6019                customer_name: { type: string }
6020              required: [customer_name]
6021              additionalProperties: false
6022    config:
6023      output_schema:
6024        type: object
6025        properties:
6026          state: { type: string }
6027        required: [state]
6028"#;
6029
6030        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6031        let client = SimpleAgentsClientBuilder::new()
6032            .with_provider(Arc::new(ToolLoopProvider))
6033            .build()
6034            .expect("client should build");
6035        let worker = FixedToolWorker {
6036            payload: json!({"unexpected": "shape"}),
6037        };
6038
6039        let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6040            &workflow,
6041            &json!({"email_text":"hello"}),
6042            &client,
6043            Some(&worker),
6044            None,
6045            &YamlWorkflowRunOptions::default(),
6046        )
6047        .await
6048        .expect_err("workflow should hard-fail on schema mismatch");
6049
6050        match error {
6051            YamlWorkflowRunError::Llm { message, .. } => {
6052                assert!(message.contains("output failed schema validation"));
6053            }
6054            other => panic!("expected llm error, got {other:?}"),
6055        }
6056    }
6057
6058    #[tokio::test]
6059    async fn yaml_llm_unknown_tool_is_rejected_before_custom_worker_execution() {
6060        let yaml = r#"
6061id: unknown-tool-rejected
6062entry_node: generate_with_tool
6063nodes:
6064  - id: generate_with_tool
6065    node_type:
6066      llm_call:
6067        model: gpt-4.1
6068        tools_format: simplified
6069        max_tool_roundtrips: 1
6070        tools:
6071          - name: get_customer_context
6072            input_schema:
6073              type: object
6074              properties:
6075                order_id: { type: string }
6076              required: [order_id]
6077    config:
6078      output_schema:
6079        type: object
6080        properties:
6081          state: { type: string }
6082        required: [state]
6083"#;
6084
6085        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6086        let client = SimpleAgentsClientBuilder::new()
6087            .with_provider(Arc::new(UnknownToolProvider))
6088            .build()
6089            .expect("client should build");
6090        let worker = CountingToolWorker {
6091            execute_calls: AtomicUsize::new(0),
6092        };
6093
6094        let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6095            &workflow,
6096            &json!({"email_text":"hello"}),
6097            &client,
6098            Some(&worker),
6099            None,
6100            &YamlWorkflowRunOptions::default(),
6101        )
6102        .await
6103        .expect_err("workflow should reject unknown tool before executing worker");
6104
6105        match error {
6106            YamlWorkflowRunError::Llm { message, .. } => {
6107                assert!(message.contains("model requested unknown tool 'unknown_tool'"));
6108            }
6109            other => panic!("expected llm error, got {other:?}"),
6110        }
6111
6112        assert_eq!(worker.execute_calls.load(Ordering::SeqCst), 0);
6113    }
6114
6115    #[test]
6116    fn validates_tools_format_mismatch() {
6117        let yaml = r#"
6118id: mismatch
6119entry_node: generate
6120nodes:
6121  - id: generate
6122    node_type:
6123      llm_call:
6124        model: gpt-4.1
6125        tools_format: openai
6126        tools:
6127          - name: get_customer_context
6128            input_schema:
6129              type: object
6130              properties:
6131                order_id: { type: string }
6132              required: [order_id]
6133"#;
6134
6135        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6136        let diagnostics = verify_yaml_workflow(&workflow);
6137        assert!(diagnostics
6138            .iter()
6139            .any(|diagnostic| diagnostic.code == "invalid_tools_format"));
6140    }
6141
6142    #[test]
6143    fn mock_custom_worker_supports_get_employee_record() {
6144        let result = mock_custom_worker_output(
6145            "get_employee_record",
6146            &json!({"employee_name": "Alex Johnson"}),
6147        )
6148        .expect("mock tool should resolve employee record");
6149
6150        assert_eq!(result["employee_id"], Value::String("EMP-2041".to_string()));
6151        assert_eq!(result["location"], Value::String("Austin".to_string()));
6152    }
6153
6154    #[tokio::test]
6155    async fn custom_worker_receives_trace_context_block() {
6156        let yaml = r#"
6157id: custom-worker-trace-context
6158entry_node: lookup
6159nodes:
6160  - id: lookup
6161    node_type:
6162      custom_worker:
6163        handler: GetRagData
6164    config:
6165      payload:
6166        topic: demo
6167"#;
6168
6169        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6170        let worker = CapturingWorker {
6171            context: Mutex::new(None),
6172        };
6173        let options = YamlWorkflowRunOptions {
6174            trace: YamlWorkflowTraceOptions {
6175                context: Some(YamlWorkflowTraceContextInput {
6176                    trace_id: Some("trace-fixed-ctx".to_string()),
6177                    traceparent: Some("00-trace-fixed-ctx-span-fixed-01".to_string()),
6178                    ..YamlWorkflowTraceContextInput::default()
6179                }),
6180                tenant: YamlWorkflowTraceTenantContext {
6181                    conversation_id: Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5".to_string()),
6182                    request_id: Some("turn-7".to_string()),
6183                    ..YamlWorkflowTraceTenantContext::default()
6184                },
6185            },
6186            ..YamlWorkflowRunOptions::default()
6187        };
6188
6189        run_workflow_yaml_with_custom_worker_and_events_and_options(
6190            &workflow,
6191            &json!({"email_text":"hello"}),
6192            &MockExecutor,
6193            Some(&worker),
6194            None,
6195            &options,
6196        )
6197        .await
6198        .expect("workflow should execute");
6199
6200        let captured_context = worker
6201            .context
6202            .lock()
6203            .expect("capturing worker lock should not be poisoned")
6204            .clone()
6205            .expect("custom worker should receive context");
6206
6207        assert_eq!(
6208            captured_context
6209                .get("trace")
6210                .and_then(|trace| trace.get("context"))
6211                .and_then(|context| context.get("trace_id"))
6212                .and_then(Value::as_str),
6213            Some("trace-fixed-ctx")
6214        );
6215        assert_eq!(
6216            captured_context
6217                .get("trace")
6218                .and_then(|trace| trace.get("context"))
6219                .and_then(|context| context.get("traceparent"))
6220                .and_then(Value::as_str),
6221            Some("00-trace-fixed-ctx-span-fixed-01")
6222        );
6223        assert_eq!(
6224            captured_context
6225                .get("trace")
6226                .and_then(|trace| trace.get("tenant"))
6227                .and_then(|tenant| tenant.get("conversation_id"))
6228                .and_then(Value::as_str),
6229            Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5")
6230        );
6231    }
6232
6233    #[tokio::test]
6234    async fn event_sink_cancellation_stops_workflow_before_llm_execution() {
6235        let yaml = r#"
6236id: cancellation-test
6237entry_node: classify
6238nodes:
6239  - id: classify
6240    node_type:
6241      llm_call:
6242        model: gpt-4.1
6243    config:
6244      prompt: |
6245        Classify this email into exactly one category:
6246        {{ input.email_text }}
6247"#;
6248
6249        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6250        let executor = CountingExecutor {
6251            call_count: AtomicUsize::new(0),
6252        };
6253        let sink = CancelAfterFirstEventSink {
6254            cancelled: AtomicBool::new(false),
6255        };
6256
6257        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
6258            &workflow,
6259            &json!({"email_text":"hello"}),
6260            &executor,
6261            None,
6262            Some(&sink),
6263            &YamlWorkflowRunOptions::default(),
6264        )
6265        .await
6266        .expect_err("workflow should stop when sink signals cancellation");
6267
6268        assert!(matches!(
6269            err,
6270            YamlWorkflowRunError::EventSinkCancelled { .. }
6271        ));
6272        assert_eq!(executor.call_count.load(Ordering::SeqCst), 0);
6273    }
6274
6275    #[tokio::test]
6276    async fn rejects_invalid_messages_path_shape() {
6277        let yaml = r#"
6278id: email-intake-classification
6279entry_node: classify_top_level
6280nodes:
6281  - id: classify_top_level
6282    node_type:
6283      llm_call:
6284        model: gpt-4.1
6285        messages_path: input.messages
6286"#;
6287
6288        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6289        let input = json!({
6290            "email_text": "ignored",
6291            "messages": "not-a-list"
6292        });
6293
6294        let err = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
6295            .await
6296            .expect_err("workflow should fail for invalid messages shape");
6297
6298        assert!(matches!(err, YamlWorkflowRunError::Llm { .. }));
6299    }
6300
6301    #[test]
6302    fn renders_yaml_workflow_to_mermaid_with_switch_labels() {
6303        let yaml = r#"
6304id: chat-workflow
6305entry_node: decide
6306nodes:
6307  - id: decide
6308    node_type:
6309      switch:
6310        branches:
6311          - condition: '$.input.mode == "draft"'
6312            target: draft
6313        default: ask
6314  - id: draft
6315    node_type:
6316      llm_call:
6317        model: gpt-4.1
6318  - id: ask
6319    node_type:
6320      llm_call:
6321        model: gpt-4.1
6322edges:
6323  - from: draft
6324    to: ask
6325"#;
6326
6327        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6328        let mermaid = yaml_workflow_to_mermaid(&workflow);
6329
6330        assert!(mermaid.contains("flowchart TD"));
6331        assert!(mermaid.contains("decide -- \"route1\" --> draft"));
6332        assert!(mermaid.contains("decide -- \"default\" --> ask"));
6333        assert!(mermaid.contains("draft --> ask"));
6334    }
6335
6336    #[test]
6337    fn renders_yaml_workflow_tools_as_colored_tool_nodes() {
6338        let yaml = r#"
6339id: tool-graph
6340entry_node: chat
6341nodes:
6342  - id: chat
6343    node_type:
6344      llm_call:
6345        model: gemini-3-flash
6346        tools_format: simplified
6347        tools:
6348          - name: run_workflow_graph
6349            input_schema:
6350              type: object
6351              properties:
6352                workflow_id: { type: string }
6353              required: [workflow_id]
6354edges: []
6355"#;
6356
6357        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6358        let mermaid = yaml_workflow_to_mermaid(&workflow);
6359
6360        assert!(mermaid.contains("chat__tool_0"));
6361        assert!(mermaid.contains("chat -.-> chat__tool_0"));
6362        assert!(mermaid.contains("classDef toolNode"));
6363        assert!(mermaid.contains("class chat__tool_0 toolNode;"));
6364    }
6365
6366    #[test]
6367    fn renders_yaml_workflow_file_to_mermaid_with_subgraph_cluster_when_present() {
6368        let base_dir = std::env::temp_dir().join(format!(
6369            "simple_agents_mermaid_subgraph_{}",
6370            std::time::SystemTime::now()
6371                .duration_since(std::time::UNIX_EPOCH)
6372                .expect("unix epoch")
6373                .as_nanos()
6374        ));
6375        fs::create_dir_all(&base_dir).expect("temp dir should be created");
6376
6377        let orchestrator_path = base_dir.join("email-chat-orchestrator-with-subgraph-tool.yaml");
6378        let subgraph_path = base_dir.join("hr-warning-email-subgraph.yaml");
6379
6380        let orchestrator_yaml = r#"
6381id: email-chat-orchestrator-with-subgraph-tool
6382entry_node: respond_casual
6383nodes:
6384  - id: respond_casual
6385    node_type:
6386      llm_call:
6387        model: gemini-3-flash
6388        tools_format: simplified
6389        tools:
6390          - name: run_workflow_graph
6391            input_schema:
6392              type: object
6393              properties:
6394                workflow_id: { type: string }
6395              required: [workflow_id]
6396    config:
6397      prompt: |
6398        Call with:
6399        {
6400          "workflow_id": "hr_warning_email_subgraph"
6401        }
6402edges: []
6403"#;
6404
6405        let subgraph_yaml = r#"
6406id: hr-warning-email-subgraph
6407entry_node: draft_hr_warning_email
6408nodes:
6409  - id: draft_hr_warning_email
6410    node_type:
6411      llm_call:
6412        model: gemini-3-flash
6413edges: []
6414"#;
6415
6416        fs::write(&orchestrator_path, orchestrator_yaml).expect("orchestrator yaml written");
6417        fs::write(&subgraph_path, subgraph_yaml).expect("subgraph yaml written");
6418
6419        let mermaid =
6420            yaml_workflow_file_to_mermaid(&orchestrator_path).expect("mermaid should render");
6421
6422        assert!(mermaid.contains("Main: email-chat-orchestrator-with-subgraph-tool"));
6423        assert!(mermaid.contains("Subgraph: hr_warning_email_subgraph"));
6424        assert!(mermaid.contains("calls hr_warning_email_subgraph"));
6425        assert!(mermaid.contains("subgraph_1__draft_hr_warning_email"));
6426
6427        fs::remove_dir_all(base_dir).expect("temp dir removed");
6428    }
6429
6430    #[test]
6431    fn converts_yaml_workflow_to_ir_definition() {
6432        let yaml = r#"
6433id: chat-workflow
6434entry_node: classify
6435nodes:
6436  - id: classify
6437    node_type:
6438      llm_call:
6439        model: gpt-4.1
6440    config:
6441      prompt: |
6442        classify
6443  - id: route
6444    node_type:
6445      switch:
6446        branches:
6447          - condition: '$.nodes.classify.output.kind == "x"'
6448            target: done
6449        default: done
6450  - id: done
6451    node_type:
6452      custom_worker:
6453        handler: GetRagData
6454    config:
6455      payload:
6456        topic: test
6457edges:
6458  - from: classify
6459    to: route
6460"#;
6461
6462        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6463        let ir = yaml_workflow_to_ir(&workflow).expect("yaml should convert to ir");
6464
6465        assert_eq!(ir.name, "chat-workflow");
6466        assert!(ir.nodes.iter().any(|n| n.id == "__yaml_start"));
6467        assert!(ir.nodes.iter().any(|n| n.id == "classify"));
6468        assert!(ir.nodes.iter().any(|n| n.id == "route"));
6469        assert!(ir.nodes.iter().any(|n| n.id == "done"));
6470    }
6471
6472    #[test]
6473    fn supports_yaml_to_ir_when_messages_path_is_used() {
6474        let yaml = r#"
6475id: chat-workflow
6476entry_node: classify
6477nodes:
6478  - id: classify
6479    node_type:
6480      llm_call:
6481        model: gpt-4.1
6482        messages_path: input.messages
6483"#;
6484
6485        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6486        let ir =
6487            yaml_workflow_to_ir(&workflow).expect("messages_path should convert to tool-based IR");
6488        assert!(ir.nodes.iter().any(|node| matches!(
6489            node.kind,
6490            crate::ir::NodeKind::Tool { ref tool, .. } if tool == "__yaml_llm_call"
6491        )));
6492    }
6493
6494    #[tokio::test]
6495    async fn workflow_output_contains_trace_id_in_both_locations() {
6496        let yaml = r#"
6497id: trace-test
6498entry_node: classify
6499nodes:
6500  - id: classify
6501    node_type:
6502      llm_call:
6503        model: gpt-4.1
6504    config:
6505      prompt: |
6506        Classify this email into exactly one category:
6507        {{ input.email_text }}
6508"#;
6509
6510        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6511        let output = run_workflow_yaml(&workflow, &json!({"email_text":"hello"}), &MockExecutor)
6512            .await
6513            .expect("workflow should execute");
6514
6515        let trace_id = output
6516            .trace_id
6517            .as_deref()
6518            .expect("trace_id should be present");
6519        assert!(!trace_id.is_empty());
6520        assert_eq!(
6521            output.metadata.as_ref().and_then(|value| {
6522                value
6523                    .get("telemetry")
6524                    .and_then(|telemetry| telemetry.get("trace_id"))
6525                    .and_then(Value::as_str)
6526            }),
6527            Some(trace_id)
6528        );
6529    }
6530
6531    #[tokio::test]
6532    async fn workflow_run_options_sample_rate_zero_marks_trace_unsampled() {
6533        let yaml = r#"
6534id: sample-rate-zero
6535entry_node: classify
6536nodes:
6537  - id: classify
6538    node_type:
6539      llm_call:
6540        model: gpt-4.1
6541    config:
6542      prompt: |
6543        Classify this email into exactly one category:
6544        {{ input.email_text }}
6545"#;
6546
6547        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6548        let options = YamlWorkflowRunOptions {
6549            telemetry: YamlWorkflowTelemetryConfig {
6550                sample_rate: 0.0,
6551                ..YamlWorkflowTelemetryConfig::default()
6552            },
6553            trace: YamlWorkflowTraceOptions {
6554                context: Some(YamlWorkflowTraceContextInput {
6555                    trace_id: Some("trace-sample-zero".to_string()),
6556                    ..YamlWorkflowTraceContextInput::default()
6557                }),
6558                tenant: YamlWorkflowTraceTenantContext::default(),
6559            },
6560            model: None,
6561        };
6562
6563        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6564            &workflow,
6565            &json!({"email_text":"hello"}),
6566            &MockExecutor,
6567            None,
6568            None,
6569            &options,
6570        )
6571        .await
6572        .expect("workflow should execute");
6573
6574        assert_eq!(output.trace_id.as_deref(), Some("trace-sample-zero"));
6575        assert_eq!(
6576            output
6577                .metadata
6578                .as_ref()
6579                .and_then(|value| value.get("telemetry"))
6580                .and_then(|telemetry| telemetry.get("sampled"))
6581                .and_then(Value::as_bool),
6582            Some(false)
6583        );
6584    }
6585
6586    #[test]
6587    fn trace_id_from_traceparent_parses_w3c_header() {
6588        let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
6589        assert_eq!(
6590            trace_id_from_traceparent(traceparent).as_deref(),
6591            Some("4bf92f3577b34da6a3ce929d0e0e4736")
6592        );
6593    }
6594
6595    #[test]
6596    fn resolve_telemetry_context_marks_traceparent_source() {
6597        let mut options = YamlWorkflowRunOptions::default();
6598        options.trace.context = Some(YamlWorkflowTraceContextInput {
6599            traceparent: Some(
6600                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
6601            ),
6602            ..YamlWorkflowTraceContextInput::default()
6603        });
6604
6605        let context = resolve_telemetry_context(&options, None);
6606
6607        assert_eq!(context.trace_id_source, TraceIdSource::Traceparent);
6608        assert_eq!(
6609            context.trace_id.as_deref(),
6610            Some("4bf92f3577b34da6a3ce929d0e0e4736")
6611        );
6612    }
6613
6614    #[tokio::test]
6615    async fn workflow_run_options_derives_trace_id_from_traceparent_when_trace_id_missing() {
6616        let yaml = r#"
6617id: traceparent-derived
6618entry_node: classify
6619nodes:
6620  - id: classify
6621    node_type:
6622      llm_call:
6623        model: gpt-4.1
6624    config:
6625      prompt: |
6626        Classify this email into exactly one category:
6627        {{ input.email_text }}
6628"#;
6629
6630        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6631        let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
6632        let options = YamlWorkflowRunOptions {
6633            telemetry: YamlWorkflowTelemetryConfig {
6634                sample_rate: 0.0,
6635                ..YamlWorkflowTelemetryConfig::default()
6636            },
6637            trace: YamlWorkflowTraceOptions {
6638                context: Some(YamlWorkflowTraceContextInput {
6639                    traceparent: Some(traceparent.to_string()),
6640                    ..YamlWorkflowTraceContextInput::default()
6641                }),
6642                tenant: YamlWorkflowTraceTenantContext::default(),
6643            },
6644            model: None,
6645        };
6646
6647        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6648            &workflow,
6649            &json!({"email_text":"hello"}),
6650            &MockExecutor,
6651            None,
6652            None,
6653            &options,
6654        )
6655        .await
6656        .expect("workflow should execute");
6657
6658        assert_eq!(
6659            output.trace_id.as_deref(),
6660            Some("4bf92f3577b34da6a3ce929d0e0e4736")
6661        );
6662        assert_eq!(
6663            output
6664                .metadata
6665                .as_ref()
6666                .and_then(|value| value.get("telemetry"))
6667                .and_then(|telemetry| telemetry.get("sampled"))
6668                .and_then(Value::as_bool),
6669            Some(false)
6670        );
6671    }
6672
6673    #[tokio::test]
6674    async fn workflow_run_options_sample_rate_one_marks_trace_sampled() {
6675        let yaml = r#"
6676id: sample-rate-one
6677entry_node: classify
6678nodes:
6679  - id: classify
6680    node_type:
6681      llm_call:
6682        model: gpt-4.1
6683    config:
6684      prompt: |
6685        Classify this email into exactly one category:
6686        {{ input.email_text }}
6687"#;
6688
6689        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6690        let options = YamlWorkflowRunOptions {
6691            telemetry: YamlWorkflowTelemetryConfig {
6692                sample_rate: 1.0,
6693                ..YamlWorkflowTelemetryConfig::default()
6694            },
6695            trace: YamlWorkflowTraceOptions {
6696                context: Some(YamlWorkflowTraceContextInput {
6697                    trace_id: Some("trace-sample-one".to_string()),
6698                    ..YamlWorkflowTraceContextInput::default()
6699                }),
6700                tenant: YamlWorkflowTraceTenantContext::default(),
6701            },
6702            model: None,
6703        };
6704
6705        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6706            &workflow,
6707            &json!({"email_text":"hello"}),
6708            &MockExecutor,
6709            None,
6710            None,
6711            &options,
6712        )
6713        .await
6714        .expect("workflow should execute");
6715
6716        assert_eq!(output.trace_id.as_deref(), Some("trace-sample-one"));
6717        assert_eq!(
6718            output
6719                .metadata
6720                .as_ref()
6721                .and_then(|value| value.get("telemetry"))
6722                .and_then(|telemetry| telemetry.get("sampled"))
6723                .and_then(Value::as_bool),
6724            Some(true)
6725        );
6726    }
6727
6728    #[tokio::test]
6729    async fn workflow_run_options_sampling_is_deterministic_for_fixed_trace_id() {
6730        let yaml = r#"
6731id: sample-rate-deterministic
6732entry_node: classify
6733nodes:
6734  - id: classify
6735    node_type:
6736      llm_call:
6737        model: gpt-4.1
6738    config:
6739      prompt: |
6740        Classify this email into exactly one category:
6741        {{ input.email_text }}
6742"#;
6743
6744        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6745        let options = YamlWorkflowRunOptions {
6746            telemetry: YamlWorkflowTelemetryConfig {
6747                sample_rate: 0.5,
6748                ..YamlWorkflowTelemetryConfig::default()
6749            },
6750            trace: YamlWorkflowTraceOptions {
6751                context: Some(YamlWorkflowTraceContextInput {
6752                    trace_id: Some("trace-sample-deterministic".to_string()),
6753                    ..YamlWorkflowTraceContextInput::default()
6754                }),
6755                tenant: YamlWorkflowTraceTenantContext::default(),
6756            },
6757            model: None,
6758        };
6759
6760        let mut sampled_values = Vec::new();
6761        for _ in 0..3 {
6762            let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6763                &workflow,
6764                &json!({"email_text":"hello"}),
6765                &MockExecutor,
6766                None,
6767                None,
6768                &options,
6769            )
6770            .await
6771            .expect("workflow should execute");
6772
6773            let sampled = output
6774                .metadata
6775                .as_ref()
6776                .and_then(|value| value.get("telemetry"))
6777                .and_then(|telemetry| telemetry.get("sampled"))
6778                .and_then(Value::as_bool)
6779                .expect("sampled flag should be present");
6780            sampled_values.push(sampled);
6781        }
6782
6783        assert!(sampled_values
6784            .iter()
6785            .all(|value| *value == sampled_values[0]));
6786    }
6787
6788    #[tokio::test]
6789    async fn workflow_run_options_reject_invalid_sample_rate() {
6790        let yaml = r#"
6791id: sample-rate-invalid
6792entry_node: classify
6793nodes:
6794  - id: classify
6795    node_type:
6796      llm_call:
6797        model: gpt-4.1
6798    config:
6799      prompt: |
6800        Classify this email into exactly one category:
6801        {{ input.email_text }}
6802"#;
6803
6804        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6805        let options = YamlWorkflowRunOptions {
6806            telemetry: YamlWorkflowTelemetryConfig {
6807                sample_rate: 1.1,
6808                ..YamlWorkflowTelemetryConfig::default()
6809            },
6810            ..YamlWorkflowRunOptions::default()
6811        };
6812
6813        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
6814            &workflow,
6815            &json!({"email_text":"hello"}),
6816            &MockExecutor,
6817            None,
6818            None,
6819            &options,
6820        )
6821        .await
6822        .expect_err("invalid sample_rate should fail");
6823
6824        match err {
6825            YamlWorkflowRunError::InvalidInput { message } => {
6826                assert!(message.contains("telemetry.sample_rate"));
6827            }
6828            _ => panic!("expected invalid input error for sample_rate"),
6829        }
6830    }
6831
6832    #[tokio::test]
6833    async fn workflow_run_options_reject_nan_sample_rate() {
6834        let yaml = r#"
6835id: sample-rate-invalid
6836entry_node: classify
6837nodes:
6838  - id: classify
6839    node_type:
6840      llm_call:
6841        model: gpt-4.1
6842    config:
6843      prompt: |
6844        Classify this email into exactly one category:
6845        {{ input.email_text }}
6846"#;
6847
6848        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6849        let options = YamlWorkflowRunOptions {
6850            telemetry: YamlWorkflowTelemetryConfig {
6851                sample_rate: f32::NAN,
6852                ..YamlWorkflowTelemetryConfig::default()
6853            },
6854            ..YamlWorkflowRunOptions::default()
6855        };
6856
6857        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
6858            &workflow,
6859            &json!({"email_text":"hello"}),
6860            &MockExecutor,
6861            None,
6862            None,
6863            &options,
6864        )
6865        .await
6866        .expect_err("nan sample_rate should fail");
6867
6868        assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
6869    }
6870
6871    #[test]
6872    fn subworkflow_options_inherit_parent_telemetry_configuration() {
6873        let mut parent_options = YamlWorkflowRunOptions::default();
6874        parent_options.telemetry.sample_rate = 0.0;
6875        parent_options.telemetry.payload_mode = YamlWorkflowPayloadMode::RedactedPayload;
6876        parent_options.telemetry.tool_trace_mode = YamlToolTraceMode::Redacted;
6877        parent_options.trace.tenant.conversation_id = Some("conv-123".to_string());
6878
6879        let parent_context = TraceContext {
6880            trace_id: Some("trace-parent".to_string()),
6881            span_id: Some("span-parent".to_string()),
6882            parent_span_id: None,
6883            traceparent: Some("00-trace-parent-span-parent-01".to_string()),
6884            tracestate: None,
6885            baggage: BTreeMap::new(),
6886        };
6887
6888        let subworkflow_options =
6889            build_subworkflow_options(&parent_options, Some(&parent_context), None);
6890
6891        assert_eq!(subworkflow_options.telemetry.sample_rate, 0.0);
6892        assert_eq!(
6893            subworkflow_options.telemetry.payload_mode,
6894            YamlWorkflowPayloadMode::RedactedPayload
6895        );
6896        assert_eq!(
6897            subworkflow_options.telemetry.tool_trace_mode,
6898            YamlToolTraceMode::Redacted
6899        );
6900        assert_eq!(
6901            subworkflow_options.trace.tenant.conversation_id.as_deref(),
6902            Some("conv-123")
6903        );
6904        assert_eq!(
6905            subworkflow_options
6906                .trace
6907                .context
6908                .as_ref()
6909                .and_then(|ctx| ctx.trace_id.as_deref()),
6910            Some("trace-parent")
6911        );
6912    }
6913
6914    #[tokio::test]
6915    async fn workflow_run_options_use_explicit_trace_id_and_payload_mode() {
6916        let yaml = r#"
6917id: trace-options-test
6918entry_node: classify
6919nodes:
6920  - id: classify
6921    node_type:
6922      llm_call:
6923        model: gpt-4.1
6924    config:
6925      prompt: |
6926        Classify this email into exactly one category:
6927        {{ input.email_text }}
6928"#;
6929
6930        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6931        let options = YamlWorkflowRunOptions {
6932            telemetry: YamlWorkflowTelemetryConfig {
6933                payload_mode: YamlWorkflowPayloadMode::RedactedPayload,
6934                ..YamlWorkflowTelemetryConfig::default()
6935            },
6936            trace: YamlWorkflowTraceOptions {
6937                context: Some(YamlWorkflowTraceContextInput {
6938                    trace_id: Some("trace-fixed-123".to_string()),
6939                    traceparent: Some("00-trace-fixed-123-span-1-01".to_string()),
6940                    ..YamlWorkflowTraceContextInput::default()
6941                }),
6942                tenant: YamlWorkflowTraceTenantContext {
6943                    conversation_id: Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42".to_string()),
6944                    ..YamlWorkflowTraceTenantContext::default()
6945                },
6946            },
6947            model: None,
6948        };
6949
6950        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6951            &workflow,
6952            &json!({"email_text":"hello"}),
6953            &MockExecutor,
6954            None,
6955            None,
6956            &options,
6957        )
6958        .await
6959        .expect("workflow should execute");
6960
6961        assert_eq!(output.trace_id.as_deref(), Some("trace-fixed-123"));
6962        assert_eq!(
6963            output
6964                .metadata
6965                .as_ref()
6966                .and_then(|value| value.get("telemetry"))
6967                .and_then(|telemetry| telemetry.get("payload_mode"))
6968                .and_then(Value::as_str),
6969            Some("redacted_payload")
6970        );
6971        assert_eq!(
6972            output
6973                .metadata
6974                .as_ref()
6975                .and_then(|value| value.get("trace"))
6976                .and_then(|trace| trace.get("tenant"))
6977                .and_then(|tenant| tenant.get("conversation_id"))
6978                .and_then(Value::as_str),
6979            Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42")
6980        );
6981    }
6982
6983    #[test]
6984    fn streamed_payload_parser_extracts_last_json_object() {
6985        let raw = r#"{"state":"missing_scenario","reason":"ok"}
6986
6987extra reasoning text
6988
6989{"state":"ready","reason":"final"}"#;
6990
6991        let resolved = parse_streamed_structured_payload(raw, false)
6992            .expect("parser should extract final JSON object");
6993        assert_eq!(resolved.payload["state"], "ready");
6994        assert!(resolved.heal_confidence.is_none());
6995    }
6996
6997    #[test]
6998    fn streamed_payload_parser_handles_unbalanced_reasoning_before_json() {
6999        let raw = "reasoning text with unmatched { braces and thoughts\n{\"state\":\"ready\",\"reason\":\"final\"}";
7000
7001        let resolved = parse_streamed_structured_payload(raw, false)
7002            .expect("parser should recover final structured JSON object");
7003        assert_eq!(resolved.payload["state"], "ready");
7004    }
7005
7006    #[test]
7007    fn streamed_payload_parser_handles_markdown_with_heal() {
7008        let raw = r#"Some preface
7009```json
7010{
7011  "state": "missing_scenario",
7012  "reason": "Need more details"
7013}
7014```
7015Some trailing explanation"#;
7016
7017        let resolved = parse_streamed_structured_payload(raw, true)
7018            .expect("heal path should parse JSON block");
7019        assert_eq!(resolved.payload["state"], "missing_scenario");
7020        assert!(resolved.heal_confidence.is_some());
7021    }
7022
7023    #[test]
7024    fn streamed_payload_parser_errors_when_no_json_candidate_exists() {
7025        let raw = "No JSON in this streamed output";
7026        let error = parse_streamed_structured_payload(raw, false)
7027            .expect_err("strict stream parse should fail without JSON candidate");
7028        assert!(error.contains("no JSON object candidate found"));
7029    }
7030
7031    #[test]
7032    fn include_raw_stream_debug_events_defaults_to_false() {
7033        let _guard = stream_debug_env_lock()
7034            .lock()
7035            .expect("stream debug env lock should not be poisoned");
7036        std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
7037        assert!(!include_raw_stream_debug_events());
7038    }
7039
7040    #[test]
7041    fn include_raw_stream_debug_events_accepts_truthy_values() {
7042        let _guard = stream_debug_env_lock()
7043            .lock()
7044            .expect("stream debug env lock should not be poisoned");
7045        std::env::set_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW", "true");
7046        assert!(include_raw_stream_debug_events());
7047        std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
7048    }
7049
7050    #[test]
7051    fn structured_json_delta_filter_strips_reasoning_prefix_and_suffix() {
7052        let mut filter = StructuredJsonDeltaFilter::default();
7053        let chunks = vec![
7054            "I will think first... ",
7055            "{\"state\":\"missing_scenario\",",
7056            "\"reason\":\"Need more details\"}",
7057            " additional commentary",
7058        ];
7059
7060        let filtered = chunks
7061            .into_iter()
7062            .filter_map(|chunk| filter.split(chunk).0)
7063            .collect::<String>();
7064
7065        assert_eq!(
7066            filtered,
7067            "{\"state\":\"missing_scenario\",\"reason\":\"Need more details\"}"
7068        );
7069    }
7070
7071    #[test]
7072    fn structured_json_delta_filter_handles_braces_inside_strings() {
7073        let mut filter = StructuredJsonDeltaFilter::default();
7074        let chunks = vec![
7075            "preface ",
7076            "{\"reason\":\"brace } in text\",\"state\":\"ok\"}",
7077            " trailing",
7078        ];
7079
7080        let filtered = chunks
7081            .into_iter()
7082            .filter_map(|chunk| filter.split(chunk).0)
7083            .collect::<String>();
7084
7085        assert_eq!(
7086            filtered,
7087            "{\"reason\":\"brace } in text\",\"state\":\"ok\"}"
7088        );
7089    }
7090
7091    #[test]
7092    fn render_json_object_as_text_converts_top_level_fields() {
7093        let rendered =
7094            render_json_object_as_text(r#"{"question":"q","confidence":0.8,"nested":{"a":1}}"#);
7095        let lines: std::collections::HashSet<&str> = rendered.lines().collect();
7096
7097        assert_eq!(lines.len(), 3);
7098        assert!(lines.contains("question: q"));
7099        assert!(lines.contains("confidence: 0.8"));
7100        assert!(lines.contains("nested: {\"a\":1}"));
7101    }
7102
7103    #[test]
7104    fn stream_json_as_text_formatter_emits_once_when_complete() {
7105        let mut formatter = StreamJsonAsTextFormatter::default();
7106        formatter.push("{\"question\":\"hello\"}");
7107
7108        let first = formatter.emit_if_ready(true);
7109        let second = formatter.emit_if_ready(true);
7110
7111        assert_eq!(first, Some("question: hello".to_string()));
7112        assert_eq!(second, None);
7113    }
7114
7115    #[test]
7116    fn rewrite_yaml_condition_preserves_output_prefix_in_field_names() {
7117        let expr = "$.nodes.classify.output.output_total == 1";
7118        let rewritten = rewrite_yaml_condition_to_ir(expr);
7119        assert_eq!(rewritten, "$.node_outputs.classify.output_total == 1");
7120    }
7121
7122    #[tokio::test]
7123    async fn validates_workflow_input_before_ir_runtime_path() {
7124        let yaml = r#"
7125id: chat-workflow
7126entry_node: classify
7127nodes:
7128  - id: classify
7129    node_type:
7130      llm_call:
7131        model: gpt-4.1
7132    config:
7133      prompt: |
7134        classify
7135"#;
7136
7137        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7138        let err = run_workflow_yaml(&workflow, &json!("not-an-object"), &MockExecutor)
7139            .await
7140            .expect_err("non-object input should fail before execution");
7141
7142        assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
7143    }
7144
7145    #[test]
7146    fn interpolate_template_supports_dollar_prefixed_paths() {
7147        let context = json!({
7148            "input": {
7149                "email_text": "hello"
7150            }
7151        });
7152
7153        let rendered = interpolate_template("value={{ $.input.email_text }}", &context);
7154        assert_eq!(rendered, "value=hello");
7155    }
7156}