Skip to main content

simple_agents_workflow/
yaml_runner.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::Path;
3use std::time::Instant;
4
5use async_trait::async_trait;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use simple_agent_type::message::{Message, Role};
10use simple_agent_type::request::CompletionRequest;
11use simple_agent_type::response::FinishReason;
12use simple_agent_type::tool::{
13    ToolCall, ToolChoice, ToolChoiceFunction, ToolChoiceMode, ToolChoiceTool, ToolDefinition,
14    ToolFunction, ToolType,
15};
16use simple_agents_core::{
17    CompletionMode, CompletionOptions, CompletionOutcome, SimpleAgentsClient,
18};
19use simple_agents_healing::JsonishParser;
20use thiserror::Error;
21
22use crate::ir::{Node, NodeKind, RouterRoute, WorkflowDefinition, WORKFLOW_IR_V0};
23use crate::observability::tracing::{
24    flush_workflow_tracer, workflow_tracer, SpanKind, TraceContext, WorkflowSpan,
25};
26use crate::runtime::{
27    LlmExecutionError, LlmExecutionInput, LlmExecutionOutput, LlmExecutor, ToolExecutionError,
28    ToolExecutionInput, ToolExecutor, WorkflowRuntime, WorkflowRuntimeError,
29    WorkflowRuntimeOptions,
30};
31use crate::validation::validate_and_normalize;
32use crate::visualize::workflow_to_mermaid;
33
34const YAML_START_NODE_ID: &str = "__yaml_start";
35const YAML_LLM_TOOL_ID: &str = "__yaml_llm_call";
36
37static TRACE_ID_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
38
39#[derive(Debug, Clone, PartialEq, Serialize)]
40pub struct YamlStepTiming {
41    pub node_id: String,
42    pub node_kind: String,
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub model_name: Option<String>,
45    pub elapsed_ms: u128,
46    #[serde(skip_serializing_if = "Option::is_none")]
47    pub prompt_tokens: Option<u32>,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub completion_tokens: Option<u32>,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub total_tokens: Option<u32>,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub reasoning_tokens: Option<u32>,
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub tokens_per_second: Option<f64>,
56}
57
58#[derive(Debug, Clone, PartialEq, Serialize)]
59pub struct YamlLlmNodeMetrics {
60    pub elapsed_ms: u128,
61    pub prompt_tokens: u32,
62    pub completion_tokens: u32,
63    pub total_tokens: u32,
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub reasoning_tokens: Option<u32>,
66    pub tokens_per_second: f64,
67}
68
69#[derive(Debug, Clone, PartialEq, Serialize)]
70pub struct YamlWorkflowRunOutput {
71    pub workflow_id: String,
72    pub entry_node: String,
73    pub email_text: String,
74    pub trace: Vec<String>,
75    pub outputs: BTreeMap<String, Value>,
76    pub terminal_node: String,
77    pub terminal_output: Option<Value>,
78    pub step_timings: Vec<YamlStepTiming>,
79    pub llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics>,
80    pub llm_node_models: BTreeMap<String, String>,
81    pub total_elapsed_ms: u128,
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub ttft_ms: Option<u128>,
84    pub total_input_tokens: u64,
85    pub total_output_tokens: u64,
86    pub total_tokens: u64,
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub total_reasoning_tokens: Option<u64>,
89    pub tokens_per_second: f64,
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub trace_id: Option<String>,
92    #[serde(skip_serializing_if = "Option::is_none")]
93    pub metadata: Option<Value>,
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
97#[serde(rename_all = "snake_case")]
98pub enum YamlWorkflowPayloadMode {
99    #[default]
100    FullPayload,
101    RedactedPayload,
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
105#[serde(rename_all = "snake_case")]
106pub enum YamlToolTraceMode {
107    #[default]
108    Full,
109    Redacted,
110    Off,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
114pub struct YamlWorkflowTraceContextInput {
115    #[serde(default)]
116    pub trace_id: Option<String>,
117    #[serde(default)]
118    pub span_id: Option<String>,
119    #[serde(default)]
120    pub parent_span_id: Option<String>,
121    #[serde(default)]
122    pub traceparent: Option<String>,
123    #[serde(default)]
124    pub tracestate: Option<String>,
125    #[serde(default)]
126    pub baggage: BTreeMap<String, String>,
127}
128
129#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
130pub struct YamlWorkflowTraceTenantContext {
131    #[serde(default)]
132    pub workspace_id: Option<String>,
133    #[serde(default)]
134    pub user_id: Option<String>,
135    #[serde(default)]
136    pub conversation_id: Option<String>,
137    #[serde(default)]
138    pub request_id: Option<String>,
139    #[serde(default)]
140    pub run_id: Option<String>,
141}
142
143#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
144pub struct YamlWorkflowTelemetryConfig {
145    #[serde(default = "default_true")]
146    pub enabled: bool,
147    #[serde(default = "default_true")]
148    pub nerdstats: bool,
149    #[serde(default = "default_sample_rate")]
150    pub sample_rate: f32,
151    #[serde(default)]
152    pub payload_mode: YamlWorkflowPayloadMode,
153    #[serde(default = "default_retention_days")]
154    pub retention_days: u32,
155    #[serde(default = "default_true")]
156    pub multi_tenant: bool,
157    #[serde(default)]
158    pub tool_trace_mode: YamlToolTraceMode,
159}
160
161impl Default for YamlWorkflowTelemetryConfig {
162    fn default() -> Self {
163        Self {
164            enabled: true,
165            nerdstats: true,
166            sample_rate: 1.0,
167            payload_mode: YamlWorkflowPayloadMode::FullPayload,
168            retention_days: 30,
169            multi_tenant: true,
170            tool_trace_mode: YamlToolTraceMode::Full,
171        }
172    }
173}
174
175#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
176pub struct YamlWorkflowTraceOptions {
177    #[serde(default)]
178    pub context: Option<YamlWorkflowTraceContextInput>,
179    #[serde(default)]
180    pub tenant: YamlWorkflowTraceTenantContext,
181}
182
183#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
184pub struct YamlWorkflowRunOptions {
185    #[serde(default)]
186    pub telemetry: YamlWorkflowTelemetryConfig,
187    #[serde(default)]
188    pub trace: YamlWorkflowTraceOptions,
189    #[serde(default)]
190    pub model: Option<String>,
191}
192
193#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
194pub struct YamlLlmTokenUsage {
195    pub prompt_tokens: u32,
196    pub completion_tokens: u32,
197    pub total_tokens: u32,
198    pub reasoning_tokens: Option<u32>,
199}
200
201#[derive(Debug, Clone, PartialEq, Serialize)]
202pub struct YamlLlmExecutionResult {
203    pub payload: Value,
204    pub usage: Option<YamlLlmTokenUsage>,
205    pub ttft_ms: Option<u128>,
206    pub tool_calls: Vec<YamlToolCallTrace>,
207}
208
209#[derive(Debug, Clone, PartialEq, Serialize)]
210pub struct YamlToolCallTrace {
211    pub id: String,
212    pub name: String,
213    pub arguments: Value,
214    pub output: Option<Value>,
215    pub status: String,
216    pub elapsed_ms: u128,
217    pub error: Option<String>,
218}
219
220#[derive(Debug, Clone, Default)]
221struct YamlTokenTotals {
222    input_tokens: u64,
223    output_tokens: u64,
224    total_tokens: u64,
225    reasoning_tokens: Option<u64>,
226}
227
228impl YamlTokenTotals {
229    fn add_usage(&mut self, usage: &YamlLlmTokenUsage) {
230        self.input_tokens += u64::from(usage.prompt_tokens);
231        self.output_tokens += u64::from(usage.completion_tokens);
232        self.total_tokens += u64::from(usage.total_tokens);
233
234        if let Some(reasoning_tokens) = usage.reasoning_tokens {
235            let next = self.reasoning_tokens.unwrap_or(0) + u64::from(reasoning_tokens);
236            self.reasoning_tokens = Some(next);
237        }
238    }
239
240    fn tokens_per_second(&self, elapsed_ms: u128) -> f64 {
241        if elapsed_ms == 0 {
242            return 0.0;
243        }
244        round_two_decimals((self.output_tokens as f64) * 1000.0 / (elapsed_ms as f64))
245    }
246}
247
248fn round_two_decimals(value: f64) -> f64 {
249    (value * 100.0).round() / 100.0
250}
251
252fn completion_tokens_per_second(completion_tokens: u32, elapsed_ms: u128) -> f64 {
253    if elapsed_ms == 0 {
254        return 0.0;
255    }
256    round_two_decimals((completion_tokens as f64) * 1000.0 / (elapsed_ms as f64))
257}
258
259fn resolve_requested_model(run_model_override: Option<&str>, node_model: &str) -> String {
260    run_model_override
261        .and_then(|model| {
262            let trimmed = model.trim();
263            if trimmed.is_empty() {
264                None
265            } else {
266                Some(trimmed.to_string())
267            }
268        })
269        .unwrap_or_else(|| node_model.to_string())
270}
271
272fn default_true() -> bool {
273    true
274}
275
276fn default_sample_rate() -> f32 {
277    1.0
278}
279
280fn default_retention_days() -> u32 {
281    30
282}
283
284fn validate_sample_rate(sample_rate: f32) -> Result<(), YamlWorkflowRunError> {
285    if sample_rate.is_finite() && (0.0..=1.0).contains(&sample_rate) {
286        return Ok(());
287    }
288
289    Err(YamlWorkflowRunError::InvalidInput {
290        message: format!(
291            "telemetry.sample_rate must be between 0.0 and 1.0 inclusive; received {sample_rate}"
292        ),
293    })
294}
295
296fn should_sample_trace(trace_id: &str, sample_rate: f32) -> bool {
297    if sample_rate >= 1.0 {
298        return true;
299    }
300    if sample_rate <= 0.0 {
301        return false;
302    }
303
304    let mut hash: u64 = 0xcbf29ce484222325;
305    for byte in trace_id.as_bytes() {
306        hash ^= u64::from(*byte);
307        hash = hash.wrapping_mul(0x100000001b3);
308    }
309
310    let ratio = (hash as f64) / (u64::MAX as f64);
311    ratio < (sample_rate as f64)
312}
313
314fn trace_id_from_traceparent(traceparent: &str) -> Option<String> {
315    let mut parts = traceparent.split('-');
316    let version = parts.next()?;
317    let trace_id = parts.next()?;
318    let _span_id = parts.next()?;
319    let _flags = parts.next()?;
320    if parts.next().is_some() {
321        return None;
322    }
323
324    if version.len() != 2 || trace_id.len() != 32 {
325        return None;
326    }
327
328    if !version.chars().all(|ch| ch.is_ascii_hexdigit()) {
329        return None;
330    }
331    if !trace_id.chars().all(|ch| ch.is_ascii_hexdigit()) {
332        return None;
333    }
334    if trace_id.chars().all(|ch| ch == '0') {
335        return None;
336    }
337
338    Some(trace_id.to_ascii_lowercase())
339}
340
341#[derive(Debug, Clone, Copy, PartialEq, Eq)]
342enum TraceIdSource {
343    Disabled,
344    ExplicitTraceId,
345    ParentTraceId,
346    Traceparent,
347    ParentTraceparent,
348    Generated,
349}
350
351#[derive(Debug, Clone)]
352struct ResolvedTelemetryContext {
353    trace_id: Option<String>,
354    sampled: bool,
355    trace_id_source: TraceIdSource,
356}
357
358fn resolve_run_trace_id_with_source(
359    options: &YamlWorkflowRunOptions,
360    parent_trace_context: Option<&TraceContext>,
361) -> (Option<String>, TraceIdSource) {
362    if !options.telemetry.enabled {
363        return (None, TraceIdSource::Disabled);
364    }
365
366    if let Some(trace_id) = options
367        .trace
368        .context
369        .as_ref()
370        .and_then(|context| context.trace_id.clone())
371    {
372        return (Some(trace_id), TraceIdSource::ExplicitTraceId);
373    }
374
375    if let Some(trace_id) = parent_trace_context.and_then(|context| context.trace_id.clone()) {
376        return (Some(trace_id), TraceIdSource::ParentTraceId);
377    }
378
379    if let Some(trace_id) = options
380        .trace
381        .context
382        .as_ref()
383        .and_then(|context| context.traceparent.as_deref())
384        .and_then(trace_id_from_traceparent)
385    {
386        return (Some(trace_id), TraceIdSource::Traceparent);
387    }
388
389    if let Some(trace_id) = parent_trace_context
390        .and_then(|context| context.traceparent.as_deref())
391        .and_then(trace_id_from_traceparent)
392    {
393        return (Some(trace_id), TraceIdSource::ParentTraceparent);
394    }
395
396    (Some(generate_trace_id()), TraceIdSource::Generated)
397}
398
399fn resolve_telemetry_context(
400    options: &YamlWorkflowRunOptions,
401    parent_trace_context: Option<&TraceContext>,
402) -> ResolvedTelemetryContext {
403    let (trace_id, trace_id_source) =
404        resolve_run_trace_id_with_source(options, parent_trace_context);
405    let sampled = trace_id
406        .as_deref()
407        .map(|value| should_sample_trace(value, options.telemetry.sample_rate))
408        .unwrap_or(false);
409
410    ResolvedTelemetryContext {
411        trace_id,
412        sampled,
413        trace_id_source,
414    }
415}
416
417fn generate_trace_id() -> String {
418    use std::time::{SystemTime, UNIX_EPOCH};
419
420    let now_nanos = SystemTime::now()
421        .duration_since(UNIX_EPOCH)
422        .map(|duration| duration.as_nanos())
423        .unwrap_or(0);
424    let sequence = u128::from(TRACE_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed));
425    format!("{:032x}", now_nanos ^ sequence)
426}
427
428fn workflow_metadata_with_trace(
429    options: &YamlWorkflowRunOptions,
430    trace_id: &str,
431    sampled: bool,
432    trace_id_source: TraceIdSource,
433) -> Value {
434    json!({
435        "telemetry": {
436            "trace_id": trace_id,
437            "trace_id_source": match trace_id_source {
438                TraceIdSource::Disabled => "disabled",
439                TraceIdSource::ExplicitTraceId => "explicit_trace_id",
440                TraceIdSource::ParentTraceId => "parent_trace_id",
441                TraceIdSource::Traceparent => "traceparent",
442                TraceIdSource::ParentTraceparent => "parent_traceparent",
443                TraceIdSource::Generated => "generated",
444            },
445            "enabled": options.telemetry.enabled,
446            "sampled": sampled,
447            "nerdstats": options.telemetry.nerdstats,
448            "sample_rate": options.telemetry.sample_rate,
449            "payload_mode": match options.telemetry.payload_mode {
450                YamlWorkflowPayloadMode::FullPayload => "full_payload",
451                YamlWorkflowPayloadMode::RedactedPayload => "redacted_payload",
452            },
453            "retention_days": options.telemetry.retention_days,
454            "multi_tenant": options.telemetry.multi_tenant,
455            "tool_trace_mode": match options.telemetry.tool_trace_mode {
456                YamlToolTraceMode::Full => "full",
457                YamlToolTraceMode::Redacted => "redacted",
458                YamlToolTraceMode::Off => "off",
459            },
460        },
461        "trace": {
462            "tenant": {
463                "workspace_id": options.trace.tenant.workspace_id,
464                "user_id": options.trace.tenant.user_id,
465                "conversation_id": options.trace.tenant.conversation_id,
466                "request_id": options.trace.tenant.request_id,
467                "run_id": options.trace.tenant.run_id,
468            }
469        },
470    })
471}
472
473fn apply_trace_identity_attributes(span: &mut dyn WorkflowSpan, trace_id: Option<&str>) {
474    if let Some(value) = trace_id {
475        span.set_attribute("trace_id", value);
476    }
477}
478
479fn apply_trace_tenant_attributes_from_tenant(
480    span: &mut dyn WorkflowSpan,
481    tenant: &YamlWorkflowTraceTenantContext,
482) {
483    if let Some(workspace_id) = tenant.workspace_id.as_deref() {
484        span.set_attribute("tenant.workspace_id", workspace_id);
485    }
486    if let Some(user_id) = tenant.user_id.as_deref() {
487        span.set_attribute("tenant.user_id", user_id);
488        span.set_attribute("user.id", user_id);
489        span.set_attribute("langfuse.user.id", user_id);
490    }
491    if let Some(conversation_id) = tenant.conversation_id.as_deref() {
492        span.set_attribute("tenant.conversation_id", conversation_id);
493        span.set_attribute("session.id", conversation_id);
494        span.set_attribute("langfuse.session.id", conversation_id);
495    }
496    if let Some(request_id) = tenant.request_id.as_deref() {
497        span.set_attribute("tenant.request_id", request_id);
498    }
499    if let Some(run_id) = tenant.run_id.as_deref() {
500        span.set_attribute("tenant.run_id", run_id);
501    }
502}
503
504fn apply_trace_tenant_attributes(span: &mut dyn WorkflowSpan, options: &YamlWorkflowRunOptions) {
505    apply_trace_tenant_attributes_from_tenant(span, &options.trace.tenant);
506}
507
508fn workflow_nerdstats(output: &YamlWorkflowRunOutput) -> Value {
509    let llm_nodes_without_usage: Vec<String> = output
510        .step_timings
511        .iter()
512        .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
513        .map(|step| step.node_id.clone())
514        .collect();
515    let token_metrics_available = llm_nodes_without_usage.is_empty();
516    let token_metrics_source = if token_metrics_available {
517        "provider_usage"
518    } else {
519        "provider_stream_usage_unavailable"
520    };
521    let total_input_tokens = if token_metrics_available {
522        json!(output.total_input_tokens)
523    } else {
524        Value::Null
525    };
526    let total_output_tokens = if token_metrics_available {
527        json!(output.total_output_tokens)
528    } else {
529        Value::Null
530    };
531    let total_tokens = if token_metrics_available {
532        json!(output.total_tokens)
533    } else {
534        Value::Null
535    };
536    let total_reasoning_tokens = if token_metrics_available {
537        json!(output.total_reasoning_tokens)
538    } else {
539        Value::Null
540    };
541    let tokens_per_second = if token_metrics_available {
542        json!(output.tokens_per_second)
543    } else {
544        Value::Null
545    };
546
547    json!({
548        "workflow_id": output.workflow_id,
549        "terminal_node": output.terminal_node,
550        "total_elapsed_ms": output.total_elapsed_ms,
551        "ttft_ms": output.ttft_ms,
552        "step_details": output.step_timings,
553        "total_input_tokens": total_input_tokens,
554        "total_output_tokens": total_output_tokens,
555        "total_tokens": total_tokens,
556        "total_reasoning_tokens": total_reasoning_tokens,
557        "tokens_per_second": tokens_per_second,
558        "trace_id": output.trace_id,
559        "token_metrics_available": token_metrics_available,
560        "token_metrics_source": token_metrics_source,
561        "llm_nodes_without_usage": llm_nodes_without_usage,
562    })
563}
564
565fn apply_langfuse_nerdstats_attributes(
566    span: &mut dyn WorkflowSpan,
567    output: &YamlWorkflowRunOutput,
568    enabled: bool,
569) {
570    if !enabled {
571        return;
572    }
573
574    let nerdstats = workflow_nerdstats(output);
575    let nerdstats_json = nerdstats.to_string();
576    span.set_attribute("langfuse.trace.metadata.nerdstats", nerdstats_json.as_str());
577
578    span.set_attribute(
579        "langfuse.trace.metadata.nerdstats.workflow_id",
580        output.workflow_id.as_str(),
581    );
582    span.set_attribute(
583        "langfuse.trace.metadata.nerdstats.terminal_node",
584        output.terminal_node.as_str(),
585    );
586    span.set_attribute(
587        "langfuse.trace.metadata.nerdstats.total_elapsed_ms",
588        output.total_elapsed_ms.to_string().as_str(),
589    );
590    span.set_attribute(
591        "langfuse.trace.metadata.nerdstats.step_details_count",
592        output.step_timings.len().to_string().as_str(),
593    );
594    span.set_attribute(
595        "langfuse.trace.metadata.nerdstats.total_input_tokens",
596        output.total_input_tokens.to_string().as_str(),
597    );
598    span.set_attribute(
599        "langfuse.trace.metadata.nerdstats.total_output_tokens",
600        output.total_output_tokens.to_string().as_str(),
601    );
602    span.set_attribute(
603        "langfuse.trace.metadata.nerdstats.total_tokens",
604        output.total_tokens.to_string().as_str(),
605    );
606    span.set_attribute(
607        "langfuse.trace.metadata.nerdstats.tokens_per_second",
608        output.tokens_per_second.to_string().as_str(),
609    );
610
611    if let Some(ttft_ms) = output.ttft_ms {
612        span.set_attribute(
613            "langfuse.trace.metadata.nerdstats.ttft_ms",
614            ttft_ms.to_string().as_str(),
615        );
616    }
617
618    if let Some(reasoning_tokens) = output.total_reasoning_tokens {
619        span.set_attribute(
620            "langfuse.trace.metadata.nerdstats.total_reasoning_tokens",
621            reasoning_tokens.to_string().as_str(),
622        );
623    }
624
625    let llm_nodes_without_usage_count = output
626        .step_timings
627        .iter()
628        .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
629        .count();
630    let token_metrics_available = llm_nodes_without_usage_count == 0;
631    span.set_attribute(
632        "langfuse.trace.metadata.nerdstats.token_metrics_available",
633        if token_metrics_available {
634            "true"
635        } else {
636            "false"
637        },
638    );
639    span.set_attribute(
640        "langfuse.trace.metadata.nerdstats.token_metrics_source",
641        if token_metrics_available {
642            "provider_usage"
643        } else {
644            "provider_stream_usage_unavailable"
645        },
646    );
647    span.set_attribute(
648        "langfuse.trace.metadata.nerdstats.llm_nodes_without_usage_count",
649        llm_nodes_without_usage_count.to_string().as_str(),
650    );
651}
652
653fn apply_langfuse_trace_input_output_attributes(
654    span: &mut dyn WorkflowSpan,
655    workflow_input: &Value,
656    output: &YamlWorkflowRunOutput,
657    payload_mode: YamlWorkflowPayloadMode,
658) {
659    let trace_input = payload_for_span(payload_mode, workflow_input);
660    span.set_attribute("langfuse.trace.input", trace_input.as_str());
661
662    if let Some(terminal_output) = output.terminal_output.as_ref() {
663        let trace_output = payload_for_span(payload_mode, terminal_output);
664        span.set_attribute("langfuse.trace.output", trace_output.as_str());
665    }
666
667    let usage_details = json!({
668        "input": output.total_input_tokens,
669        "output": output.total_output_tokens,
670        "total": output.total_tokens,
671        "reasoning": output.total_reasoning_tokens,
672    })
673    .to_string();
674    span.set_attribute(
675        "langfuse.trace.metadata.usage_details",
676        usage_details.as_str(),
677    );
678}
679
680fn apply_langfuse_observation_usage_attributes(
681    span: &mut dyn WorkflowSpan,
682    usage: &YamlLlmTokenUsage,
683) {
684    let usage_details = json!({
685        "input": usage.prompt_tokens,
686        "output": usage.completion_tokens,
687        "total": usage.total_tokens,
688        "reasoning": usage.reasoning_tokens,
689    })
690    .to_string();
691    span.set_attribute("langfuse.observation.usage_details", usage_details.as_str());
692    span.set_attribute(
693        "gen_ai.usage.input_tokens",
694        usage.prompt_tokens.to_string().as_str(),
695    );
696    span.set_attribute(
697        "gen_ai.usage.output_tokens",
698        usage.completion_tokens.to_string().as_str(),
699    );
700    span.set_attribute(
701        "gen_ai.usage.total_tokens",
702        usage.total_tokens.to_string().as_str(),
703    );
704    if let Some(reasoning_tokens) = usage.reasoning_tokens {
705        span.set_attribute(
706            "gen_ai.usage.reasoning_tokens",
707            reasoning_tokens.to_string().as_str(),
708        );
709    }
710}
711
712fn payload_for_span(mode: YamlWorkflowPayloadMode, payload: &Value) -> String {
713    match mode {
714        YamlWorkflowPayloadMode::FullPayload => payload.to_string(),
715        YamlWorkflowPayloadMode::RedactedPayload => json!({
716            "redacted": true,
717            "value_type": match payload {
718                Value::Null => "null",
719                Value::Bool(_) => "bool",
720                Value::Number(_) => "number",
721                Value::String(_) => "string",
722                Value::Array(_) => "array",
723                Value::Object(_) => "object",
724            }
725        })
726        .to_string(),
727    }
728}
729
730fn payload_for_tool_trace(mode: YamlToolTraceMode, payload: &Value) -> Value {
731    match mode {
732        YamlToolTraceMode::Full => payload.clone(),
733        YamlToolTraceMode::Redacted => json!({
734            "redacted": true,
735            "value_type": json_type_name(payload),
736        }),
737        YamlToolTraceMode::Off => Value::Null,
738    }
739}
740
741fn validate_json_schema(schema: &Value) -> Result<(), String> {
742    jsonschema::JSONSchema::compile(schema)
743        .map(|_| ())
744        .map_err(|error| format!("invalid JSON schema: {error}"))
745}
746
747fn validate_schema_instance(schema: &Value, instance: &Value) -> Result<(), String> {
748    let validator = jsonschema::JSONSchema::compile(schema)
749        .map_err(|error| format!("invalid JSON schema: {error}"))?;
750    if let Err(errors) = validator.validate(instance) {
751        let message = errors
752            .into_iter()
753            .next()
754            .map(|error| error.to_string())
755            .unwrap_or_else(|| "unknown schema validation error".to_string());
756        return Err(format!("schema validation failed: {message}"));
757    }
758    Ok(())
759}
760
761fn schema_type(schema: &Value) -> Option<&str> {
762    schema.get("type").and_then(Value::as_str)
763}
764
765fn schema_expects_object(schema: &Value) -> bool {
766    schema_type(schema) == Some("object")
767}
768
769fn trace_context_from_options(options: &YamlWorkflowRunOptions) -> Option<TraceContext> {
770    options.trace.context.as_ref().map(|input| TraceContext {
771        trace_id: input.trace_id.clone(),
772        span_id: input.span_id.clone(),
773        parent_span_id: input.parent_span_id.clone(),
774        traceparent: input.traceparent.clone(),
775        tracestate: input.tracestate.clone(),
776        baggage: input.baggage.clone(),
777    })
778}
779
780fn merged_trace_context_for_worker(
781    span_context: Option<&TraceContext>,
782    resolved_trace_id: Option<&str>,
783    options: &YamlWorkflowRunOptions,
784) -> TraceContext {
785    let input_context = options.trace.context.as_ref();
786    let baggage = if let Some(context) = span_context {
787        if !context.baggage.is_empty() {
788            context.baggage.clone()
789        } else {
790            input_context
791                .map(|value| value.baggage.clone())
792                .unwrap_or_default()
793        }
794    } else {
795        input_context
796            .map(|value| value.baggage.clone())
797            .unwrap_or_default()
798    };
799
800    TraceContext {
801        trace_id: span_context
802            .and_then(|context| context.trace_id.clone())
803            .or_else(|| resolved_trace_id.map(|value| value.to_string()))
804            .or_else(|| input_context.and_then(|context| context.trace_id.clone())),
805        span_id: span_context
806            .and_then(|context| context.span_id.clone())
807            .or_else(|| input_context.and_then(|context| context.span_id.clone())),
808        parent_span_id: span_context
809            .and_then(|context| context.parent_span_id.clone())
810            .or_else(|| input_context.and_then(|context| context.parent_span_id.clone())),
811        traceparent: span_context
812            .and_then(|context| context.traceparent.clone())
813            .or_else(|| input_context.and_then(|context| context.traceparent.clone())),
814        tracestate: span_context
815            .and_then(|context| context.tracestate.clone())
816            .or_else(|| input_context.and_then(|context| context.tracestate.clone())),
817        baggage,
818    }
819}
820
821fn custom_worker_context_with_trace(
822    context: &Value,
823    trace_context: &TraceContext,
824    tenant_context: &YamlWorkflowTraceTenantContext,
825) -> Value {
826    let mut context_with_trace = context.clone();
827    let Some(root) = context_with_trace.as_object_mut() else {
828        return context_with_trace;
829    };
830
831    root.insert(
832        "trace".to_string(),
833        json!({
834            "context": {
835                "trace_id": trace_context.trace_id,
836                "span_id": trace_context.span_id,
837                "parent_span_id": trace_context.parent_span_id,
838                "traceparent": trace_context.traceparent,
839                "tracestate": trace_context.tracestate,
840                "baggage": trace_context.baggage,
841            },
842            "tenant": {
843                "workspace_id": tenant_context.workspace_id,
844                "user_id": tenant_context.user_id,
845                "conversation_id": tenant_context.conversation_id,
846                "request_id": tenant_context.request_id,
847                "run_id": tenant_context.run_id,
848            }
849        }),
850    );
851
852    context_with_trace
853}
854
855fn include_raw_stream_debug_events() -> bool {
856    match std::env::var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW") {
857        Ok(value) => {
858            let normalized = value.trim().to_ascii_lowercase();
859            normalized == "1" || normalized == "true" || normalized == "yes" || normalized == "on"
860        }
861        Err(_) => false,
862    }
863}
864
865#[derive(Debug)]
866struct StreamedPayloadResolution {
867    payload: Value,
868    heal_confidence: Option<f32>,
869}
870
871#[derive(Debug, Default)]
872struct StreamJsonAsTextFormatter {
873    raw_json: String,
874    emitted: bool,
875}
876
877impl StreamJsonAsTextFormatter {
878    fn push(&mut self, chunk: &str) {
879        self.raw_json.push_str(chunk);
880    }
881
882    fn emit_if_ready(&mut self, complete: bool) -> Option<String> {
883        if self.emitted || !complete {
884            return None;
885        }
886        self.emitted = true;
887        Some(render_json_object_as_text(self.raw_json.as_str()))
888    }
889}
890
891fn render_json_object_as_text(raw_json: &str) -> String {
892    let value = match serde_json::from_str::<Value>(raw_json) {
893        Ok(value) => value,
894        Err(_) => return raw_json.to_string(),
895    };
896    let Some(object) = value.as_object() else {
897        return raw_json.to_string();
898    };
899
900    let mut lines = Vec::with_capacity(object.len());
901    for (key, value) in object {
902        let rendered = match value {
903            Value::String(text) => text.clone(),
904            _ => value.to_string(),
905        };
906        lines.push(format!("{key}: {rendered}"));
907    }
908    lines.join("\n")
909}
910
911#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
912#[serde(rename_all = "snake_case")]
913pub enum YamlWorkflowTokenKind {
914    Output,
915    Thinking,
916}
917
918#[derive(Debug, Default)]
919struct StructuredJsonDeltaFilter {
920    started: bool,
921    completed: bool,
922    depth: u32,
923    in_string: bool,
924    escape: bool,
925}
926
927impl StructuredJsonDeltaFilter {
928    fn split(&mut self, delta: &str) -> (Option<String>, Option<String>) {
929        if delta.is_empty() {
930            return (None, None);
931        }
932
933        let mut output = String::new();
934        let mut thinking = String::new();
935
936        for ch in delta.chars() {
937            if self.completed {
938                thinking.push(ch);
939                continue;
940            }
941
942            if !self.started {
943                if ch != '{' {
944                    thinking.push(ch);
945                    continue;
946                }
947                self.started = true;
948                self.depth = 1;
949                output.push(ch);
950                continue;
951            }
952
953            output.push(ch);
954            if self.in_string {
955                if self.escape {
956                    self.escape = false;
957                    continue;
958                }
959                if ch == '\\' {
960                    self.escape = true;
961                    continue;
962                }
963                if ch == '"' {
964                    self.in_string = false;
965                }
966                continue;
967            }
968
969            match ch {
970                '"' => self.in_string = true,
971                '{' => self.depth = self.depth.saturating_add(1),
972                '}' => {
973                    if self.depth > 0 {
974                        self.depth -= 1;
975                    }
976                    if self.depth == 0 {
977                        self.completed = true;
978                    }
979                }
980                _ => {}
981            }
982        }
983
984        let output = if output.is_empty() {
985            None
986        } else {
987            Some(output)
988        };
989        let thinking = if thinking.is_empty() {
990            None
991        } else {
992            Some(thinking)
993        };
994
995        (output, thinking)
996    }
997
998    fn completed(&self) -> bool {
999        self.completed
1000    }
1001}
1002
1003fn extract_last_fenced_json_block(raw: &str) -> Option<&str> {
1004    let start = raw.rfind("```json")?;
1005    let remainder = &raw[start + "```json".len()..];
1006    let end = remainder.find("```")?;
1007    let candidate = remainder[..end].trim();
1008    if candidate.is_empty() {
1009        return None;
1010    }
1011    Some(candidate)
1012}
1013
1014fn extract_balanced_object_from(raw: &str, start_index: usize) -> Option<&str> {
1015    let mut depth = 0u32;
1016    let mut in_string = false;
1017    let mut escape = false;
1018
1019    for (relative_index, ch) in raw[start_index..].char_indices() {
1020        if in_string {
1021            if escape {
1022                escape = false;
1023                continue;
1024            }
1025            if ch == '\\' {
1026                escape = true;
1027                continue;
1028            }
1029            if ch == '"' {
1030                in_string = false;
1031            }
1032            continue;
1033        }
1034
1035        match ch {
1036            '"' => in_string = true,
1037            '{' => depth = depth.saturating_add(1),
1038            '}' => {
1039                if depth == 0 {
1040                    return None;
1041                }
1042                depth -= 1;
1043                if depth == 0 {
1044                    let end_index = start_index + relative_index + ch.len_utf8();
1045                    return Some(raw[start_index..end_index].trim());
1046                }
1047            }
1048            _ => {}
1049        }
1050    }
1051
1052    None
1053}
1054
1055fn extract_last_parsable_object(raw: &str) -> Option<&str> {
1056    let starts: Vec<usize> = raw
1057        .char_indices()
1058        .filter_map(|(index, ch)| if ch == '{' { Some(index) } else { None })
1059        .collect();
1060
1061    for start in starts.into_iter().rev() {
1062        let Some(candidate) = extract_balanced_object_from(raw, start) else {
1063            continue;
1064        };
1065        if serde_json::from_str::<Value>(candidate).is_ok() {
1066            return Some(candidate);
1067        }
1068    }
1069
1070    None
1071}
1072
1073fn resolve_structured_json_candidate(raw: &str) -> Option<&str> {
1074    extract_last_fenced_json_block(raw).or_else(|| extract_last_parsable_object(raw))
1075}
1076
1077fn parse_streamed_structured_payload(
1078    raw: &str,
1079    heal: bool,
1080) -> Result<StreamedPayloadResolution, String> {
1081    if !heal {
1082        if let Ok(payload) = serde_json::from_str::<Value>(raw) {
1083            return Ok(StreamedPayloadResolution {
1084                payload,
1085                heal_confidence: None,
1086            });
1087        }
1088
1089        let candidate = resolve_structured_json_candidate(raw).ok_or_else(|| {
1090            "failed to parse streamed structured completion JSON: no JSON object candidate found"
1091                .to_string()
1092        })?;
1093        let payload = serde_json::from_str::<Value>(candidate).map_err(|error| {
1094            format!(
1095                "failed to parse streamed structured completion JSON: {error}; candidate={candidate}"
1096            )
1097        })?;
1098        return Ok(StreamedPayloadResolution {
1099            payload,
1100            heal_confidence: None,
1101        });
1102    }
1103
1104    let candidate = resolve_structured_json_candidate(raw).unwrap_or(raw);
1105    let parser = JsonishParser::new();
1106    let healed = parser
1107        .parse(candidate)
1108        .map_err(|error| format!("failed to heal streamed structured completion JSON: {error}"))?;
1109
1110    Ok(StreamedPayloadResolution {
1111        payload: healed.value,
1112        heal_confidence: Some(healed.confidence),
1113    })
1114}
1115
1116#[derive(Debug, Clone, PartialEq, Serialize)]
1117pub struct YamlWorkflowEvent {
1118    pub event_type: String,
1119    pub node_id: Option<String>,
1120    pub step_id: Option<String>,
1121    pub node_kind: Option<String>,
1122    pub streamable: Option<bool>,
1123    pub message: Option<String>,
1124    pub delta: Option<String>,
1125    pub token_kind: Option<YamlWorkflowTokenKind>,
1126    pub is_terminal_node_token: Option<bool>,
1127    pub elapsed_ms: Option<u128>,
1128    pub metadata: Option<Value>,
1129}
1130
1131pub type WorkflowMessageRole = Role;
1132
1133#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1134pub struct WorkflowMessage {
1135    pub role: WorkflowMessageRole,
1136    pub content: String,
1137    #[serde(default)]
1138    pub name: Option<String>,
1139    #[serde(default, alias = "toolCallId")]
1140    pub tool_call_id: Option<String>,
1141}
1142
1143#[derive(Debug, Clone, PartialEq, Serialize)]
1144pub struct YamlTemplateBinding {
1145    pub index: usize,
1146    pub expression: String,
1147    pub source_path: String,
1148    pub resolved: Value,
1149    pub resolved_type: String,
1150    pub missing: bool,
1151}
1152
1153#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1154pub enum YamlWorkflowDiagnosticSeverity {
1155    Error,
1156    Warning,
1157}
1158
1159#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1160pub struct YamlWorkflowDiagnostic {
1161    pub node_id: Option<String>,
1162    pub code: String,
1163    pub severity: YamlWorkflowDiagnosticSeverity,
1164    pub message: String,
1165}
1166
1167#[derive(Debug, Error)]
1168pub enum YamlWorkflowRunError {
1169    #[error("failed to read workflow yaml '{path}': {source}")]
1170    Read {
1171        path: String,
1172        source: std::io::Error,
1173    },
1174    #[error("failed to parse workflow yaml '{path}': {source}")]
1175    Parse {
1176        path: String,
1177        source: serde_yaml::Error,
1178    },
1179    #[error("workflow '{workflow_id}' has no nodes")]
1180    EmptyNodes { workflow_id: String },
1181    #[error("entry node '{entry_node}' does not exist")]
1182    MissingEntry { entry_node: String },
1183    #[error("unknown node id '{node_id}'")]
1184    MissingNode { node_id: String },
1185    #[error("unsupported node type in '{node_id}'")]
1186    UnsupportedNodeType { node_id: String },
1187    #[error("unsupported switch condition format: {condition}")]
1188    UnsupportedCondition { condition: String },
1189    #[error("switch node '{node_id}' has no valid next target")]
1190    InvalidSwitchTarget { node_id: String },
1191    #[error("llm returned non-object payload for node '{node_id}'")]
1192    LlmPayloadNotObject { node_id: String },
1193    #[error("custom worker handler '{handler}' is not supported")]
1194    UnsupportedCustomHandler { handler: String },
1195    #[error("llm execution failed for node '{node_id}': {message}")]
1196    Llm { node_id: String, message: String },
1197    #[error("custom worker execution failed for node '{node_id}': {message}")]
1198    CustomWorker { node_id: String, message: String },
1199    #[error("workflow validation failed with {diagnostics_count} error(s)")]
1200    Validation {
1201        diagnostics_count: usize,
1202        diagnostics: Vec<YamlWorkflowDiagnostic>,
1203    },
1204    #[error("invalid workflow input: {message}")]
1205    InvalidInput { message: String },
1206    #[error("ir runtime execution failed: {message}")]
1207    IrRuntime { message: String },
1208    #[error("workflow event stream cancelled: {message}")]
1209    EventSinkCancelled { message: String },
1210}
1211
1212pub trait YamlWorkflowEventSink: Send + Sync {
1213    fn emit(&self, event: &YamlWorkflowEvent);
1214
1215    fn is_cancelled(&self) -> bool {
1216        false
1217    }
1218}
1219
1220pub struct NoopYamlWorkflowEventSink;
1221
1222impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
1223    fn emit(&self, _event: &YamlWorkflowEvent) {}
1224}
1225
1226fn workflow_event_sink_cancelled_message() -> &'static str {
1227    "workflow event callback cancelled"
1228}
1229
1230fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
1231    event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
1232}
1233
1234#[derive(Debug, Clone, PartialEq, Eq, Error)]
1235pub enum YamlToIrError {
1236    #[error("entry node '{entry_node}' does not exist")]
1237    MissingEntry { entry_node: String },
1238    #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
1239    MultipleOutgoingEdge { node_id: String },
1240    #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
1241    UnsupportedNode { node_id: String, reason: String },
1242}
1243
1244/// Render a YAML workflow graph as Mermaid flowchart.
1245pub fn yaml_workflow_to_mermaid(workflow: &YamlWorkflow) -> String {
1246    if let Ok(ir) = yaml_workflow_to_ir(workflow) {
1247        return workflow_to_mermaid(&ir);
1248    }
1249
1250    yaml_workflow_to_mermaid_fallback(workflow)
1251}
1252
1253fn yaml_workflow_to_mermaid_fallback(workflow: &YamlWorkflow) -> String {
1254    let mut lines = Vec::new();
1255    lines.push("flowchart TD".to_string());
1256    let mut tool_node_ids: Vec<String> = Vec::new();
1257
1258    for node in &workflow.nodes {
1259        let label = format!("{}\\n({})", node.id, node.kind_name());
1260        lines.push(format!(
1261            "  {}[\"{}\"]",
1262            sanitize_mermaid_id(&node.id),
1263            escape_mermaid_label(label.as_str()),
1264        ));
1265
1266        if let Some(llm) = node.node_type.llm_call.as_ref() {
1267            for (idx, tool_name) in llm_tool_names(llm).into_iter().enumerate() {
1268                let tool_id = sanitize_mermaid_id(format!("{}__tool_{}", node.id, idx).as_str());
1269                lines.push(format!(
1270                    "  {}([\"{}\"])",
1271                    tool_id,
1272                    escape_mermaid_label(format!("tool: {tool_name}").as_str())
1273                ));
1274                lines.push(format!(
1275                    "  {} -.-> {}",
1276                    sanitize_mermaid_id(&node.id),
1277                    tool_id
1278                ));
1279                tool_node_ids.push(tool_id);
1280            }
1281        }
1282    }
1283
1284    let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1285
1286    for edge in &workflow.edges {
1287        emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1288    }
1289
1290    for node in &workflow.nodes {
1291        if let Some(switch) = node.node_type.switch.as_ref() {
1292            for branch in &switch.branches {
1293                emitted.insert((
1294                    node.id.clone(),
1295                    branch.condition.clone(),
1296                    branch.target.clone(),
1297                ));
1298            }
1299            emitted.insert((
1300                node.id.clone(),
1301                "default".to_string(),
1302                switch.default.clone(),
1303            ));
1304        }
1305    }
1306
1307    let mut edges = emitted.into_iter().collect::<Vec<_>>();
1308    edges.sort();
1309
1310    for (from, label, to) in edges {
1311        if label.is_empty() {
1312            lines.push(format!(
1313                "  {} --> {}",
1314                sanitize_mermaid_id(&from),
1315                sanitize_mermaid_id(&to)
1316            ));
1317        } else {
1318            lines.push(format!(
1319                "  {} -- \"{}\" --> {}",
1320                sanitize_mermaid_id(&from),
1321                escape_mermaid_label(&label),
1322                sanitize_mermaid_id(&to)
1323            ));
1324        }
1325    }
1326
1327    if !tool_node_ids.is_empty() {
1328        lines.push("  classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1329        lines.push(format!("  class {} toolNode;", tool_node_ids.join(",")));
1330    }
1331
1332    lines.join("\n")
1333}
1334
1335fn llm_tool_names(llm: &YamlLlmCall) -> Vec<String> {
1336    llm.tools
1337        .iter()
1338        .map(|tool| match tool {
1339            YamlToolDeclaration::OpenAi(openai) => openai.function.name.clone(),
1340            YamlToolDeclaration::Simplified(simple) => simple.name.clone(),
1341        })
1342        .collect()
1343}
1344
1345/// Load a YAML workflow file and render it as Mermaid flowchart.
1346pub fn yaml_workflow_file_to_mermaid(workflow_path: &Path) -> Result<String, YamlWorkflowRunError> {
1347    let contents =
1348        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1349            path: workflow_path.display().to_string(),
1350            source,
1351        })?;
1352
1353    let workflow: YamlWorkflow =
1354        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1355            path: workflow_path.display().to_string(),
1356            source,
1357        })?;
1358
1359    let referenced_subgraphs = discover_referenced_subgraphs(workflow_path, &workflow)?;
1360    if referenced_subgraphs.is_empty() {
1361        return Ok(yaml_workflow_to_mermaid(&workflow));
1362    }
1363
1364    Ok(yaml_workflow_to_mermaid_with_subgraphs(
1365        &workflow,
1366        &referenced_subgraphs,
1367    ))
1368}
1369
1370#[derive(Debug, Clone)]
1371struct MermaidSubgraphWorkflow {
1372    alias: String,
1373    workflow: YamlWorkflow,
1374}
1375
1376#[derive(Debug, Default)]
1377struct MermaidBlockRender {
1378    lines: Vec<String>,
1379    tool_node_ids: Vec<String>,
1380    run_workflow_tool_node_ids: Vec<String>,
1381    entry_node_id: String,
1382}
1383
1384fn yaml_workflow_to_mermaid_with_subgraphs(
1385    workflow: &YamlWorkflow,
1386    subgraphs: &[MermaidSubgraphWorkflow],
1387) -> String {
1388    let mut lines = vec!["flowchart TD".to_string()];
1389
1390    let main_block = render_mermaid_block(workflow, "main");
1391    lines.push(format!(
1392        "  subgraph main_graph[\"Main: {}\"]",
1393        escape_mermaid_label(&workflow.id)
1394    ));
1395    for line in &main_block.lines {
1396        lines.push(format!("    {line}"));
1397    }
1398    lines.push("  end".to_string());
1399
1400    let mut all_tool_nodes = main_block.tool_node_ids.clone();
1401
1402    for (index, subgraph) in subgraphs.iter().enumerate() {
1403        let block_id = format!("subgraph_{}", index + 1);
1404        let block = render_mermaid_block(&subgraph.workflow, &block_id);
1405        lines.push(format!(
1406            "  subgraph {}[\"Subgraph: {}\"]",
1407            sanitize_mermaid_id(&format!("{}_cluster", block_id)),
1408            escape_mermaid_label(&subgraph.alias)
1409        ));
1410        for line in &block.lines {
1411            lines.push(format!("    {line}"));
1412        }
1413        lines.push("  end".to_string());
1414
1415        for tool_node in &main_block.run_workflow_tool_node_ids {
1416            lines.push(format!(
1417                "  {} -. \"{}\" .-> {}",
1418                tool_node,
1419                escape_mermaid_label(&format!("calls {}", subgraph.alias)),
1420                block.entry_node_id
1421            ));
1422        }
1423
1424        all_tool_nodes.extend(block.tool_node_ids);
1425    }
1426
1427    if !all_tool_nodes.is_empty() {
1428        lines.push("  classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1429        lines.push(format!("  class {} toolNode;", all_tool_nodes.join(",")));
1430    }
1431
1432    lines.join("\n")
1433}
1434
1435fn render_mermaid_block(workflow: &YamlWorkflow, prefix: &str) -> MermaidBlockRender {
1436    let mut block = MermaidBlockRender {
1437        entry_node_id: prefixed_mermaid_id(prefix, &workflow.entry_node),
1438        ..Default::default()
1439    };
1440
1441    for node in &workflow.nodes {
1442        let node_id = prefixed_mermaid_id(prefix, &node.id);
1443        let label = format!("{}\\n({})", node.id, node.kind_name());
1444        block
1445            .lines
1446            .push(format!("{}[\"{}\"]", node_id, escape_mermaid_label(&label)));
1447
1448        if let Some(llm) = node.node_type.llm_call.as_ref() {
1449            for (idx, tool) in llm.tools.iter().enumerate() {
1450                let tool_name = tool_declaration_name(tool);
1451                let tool_id = prefixed_mermaid_id(prefix, &format!("{}__tool_{}", node.id, idx));
1452                block.lines.push(format!(
1453                    "{}([\"{}\"])",
1454                    tool_id,
1455                    escape_mermaid_label(format!("tool: {tool_name}").as_str())
1456                ));
1457                block.lines.push(format!("{} -.-> {}", node_id, tool_id));
1458                block.tool_node_ids.push(tool_id.clone());
1459
1460                if tool_name == "run_workflow_graph" {
1461                    block.run_workflow_tool_node_ids.push(tool_id);
1462                }
1463            }
1464        }
1465    }
1466
1467    let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1468
1469    for edge in &workflow.edges {
1470        emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1471    }
1472
1473    for node in &workflow.nodes {
1474        if let Some(switch) = node.node_type.switch.as_ref() {
1475            for branch in &switch.branches {
1476                emitted.insert((
1477                    node.id.clone(),
1478                    branch.condition.clone(),
1479                    branch.target.clone(),
1480                ));
1481            }
1482            emitted.insert((
1483                node.id.clone(),
1484                "default".to_string(),
1485                switch.default.clone(),
1486            ));
1487        }
1488    }
1489
1490    let mut edges = emitted.into_iter().collect::<Vec<_>>();
1491    edges.sort();
1492
1493    for (from, label, to) in edges {
1494        let from_id = prefixed_mermaid_id(prefix, &from);
1495        let to_id = prefixed_mermaid_id(prefix, &to);
1496        if label.is_empty() {
1497            block.lines.push(format!("{} --> {}", from_id, to_id));
1498        } else {
1499            block.lines.push(format!(
1500                "{} -- \"{}\" --> {}",
1501                from_id,
1502                escape_mermaid_label(&label),
1503                to_id
1504            ));
1505        }
1506    }
1507
1508    block
1509}
1510
1511fn discover_referenced_subgraphs(
1512    workflow_path: &Path,
1513    workflow: &YamlWorkflow,
1514) -> Result<Vec<MermaidSubgraphWorkflow>, YamlWorkflowRunError> {
1515    let workflow_ids = referenced_workflow_ids(workflow);
1516    if workflow_ids.is_empty() {
1517        return Ok(Vec::new());
1518    }
1519
1520    let parent_dir = workflow_path.parent().unwrap_or(Path::new("."));
1521    let sibling_workflows = load_yaml_sibling_workflows(parent_dir, workflow_path)?;
1522
1523    let mut discovered = Vec::new();
1524    let mut seen = HashSet::new();
1525
1526    for workflow_id in workflow_ids {
1527        let normalized = normalize_workflow_lookup_key(&workflow_id);
1528        if seen.contains(&normalized) {
1529            continue;
1530        }
1531
1532        if let Some((_, subworkflow)) = sibling_workflows.iter().find(|(key, _)| key == &normalized)
1533        {
1534            discovered.push(MermaidSubgraphWorkflow {
1535                alias: workflow_id.clone(),
1536                workflow: subworkflow.clone(),
1537            });
1538            seen.insert(normalized);
1539        }
1540    }
1541
1542    Ok(discovered)
1543}
1544
1545fn load_yaml_sibling_workflows(
1546    parent_dir: &Path,
1547    workflow_path: &Path,
1548) -> Result<Vec<(String, YamlWorkflow)>, YamlWorkflowRunError> {
1549    let mut results = Vec::new();
1550    let entries = std::fs::read_dir(parent_dir).map_err(|source| YamlWorkflowRunError::Read {
1551        path: parent_dir.display().to_string(),
1552        source,
1553    })?;
1554
1555    for entry in entries {
1556        let entry = entry.map_err(|source| YamlWorkflowRunError::Read {
1557            path: parent_dir.display().to_string(),
1558            source,
1559        })?;
1560        let path = entry.path();
1561        if !is_yaml_file(&path) {
1562            continue;
1563        }
1564
1565        if path == workflow_path {
1566            continue;
1567        }
1568
1569        let contents =
1570            std::fs::read_to_string(&path).map_err(|source| YamlWorkflowRunError::Read {
1571                path: path.display().to_string(),
1572                source,
1573            })?;
1574        let subworkflow: YamlWorkflow =
1575            serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1576                path: path.display().to_string(),
1577                source,
1578            })?;
1579
1580        if let Some(stem) = path.file_stem().and_then(|value| value.to_str()) {
1581            results.push((normalize_workflow_lookup_key(stem), subworkflow.clone()));
1582        }
1583        results.push((normalize_workflow_lookup_key(&subworkflow.id), subworkflow));
1584    }
1585
1586    Ok(results)
1587}
1588
1589fn referenced_workflow_ids(workflow: &YamlWorkflow) -> Vec<String> {
1590    let mut ids = Vec::new();
1591    let mut seen = HashSet::new();
1592
1593    for node in &workflow.nodes {
1594        let prompt = node
1595            .config
1596            .as_ref()
1597            .and_then(|config| config.prompt.as_deref());
1598
1599        if let Some(llm) = node.node_type.llm_call.as_ref() {
1600            for tool in &llm.tools {
1601                if tool_declaration_name(tool) != "run_workflow_graph" {
1602                    continue;
1603                }
1604
1605                for workflow_id in referenced_workflow_ids_from_tool(tool) {
1606                    let normalized = normalize_workflow_lookup_key(&workflow_id);
1607                    if seen.insert(normalized) {
1608                        ids.push(workflow_id);
1609                    }
1610                }
1611
1612                if let Some(prompt_text) = prompt {
1613                    for workflow_id in referenced_workflow_ids_from_prompt(prompt_text) {
1614                        let normalized = normalize_workflow_lookup_key(&workflow_id);
1615                        if seen.insert(normalized) {
1616                            ids.push(workflow_id);
1617                        }
1618                    }
1619                }
1620            }
1621        }
1622    }
1623
1624    ids
1625}
1626
1627fn referenced_workflow_ids_from_tool(tool: &YamlToolDeclaration) -> Vec<String> {
1628    let schema = match tool {
1629        YamlToolDeclaration::OpenAi(openai) => openai.function.parameters.as_ref(),
1630        YamlToolDeclaration::Simplified(simple) => Some(&simple.input_schema),
1631    };
1632
1633    let mut ids = Vec::new();
1634    if let Some(schema) = schema {
1635        if let Some(workflow_prop) = schema
1636            .get("properties")
1637            .and_then(Value::as_object)
1638            .and_then(|properties| properties.get("workflow_id"))
1639        {
1640            if let Some(value) = workflow_prop.get("const").and_then(Value::as_str) {
1641                ids.push(value.to_string());
1642            }
1643
1644            if let Some(enum_values) = workflow_prop.get("enum").and_then(Value::as_array) {
1645                for value in enum_values {
1646                    if let Some(value) = value.as_str() {
1647                        ids.push(value.to_string());
1648                    }
1649                }
1650            }
1651        }
1652    }
1653
1654    ids
1655}
1656
1657fn referenced_workflow_ids_from_prompt(prompt: &str) -> Vec<String> {
1658    let mut ids = Vec::new();
1659    let mut search = prompt;
1660
1661    while let Some(index) = search.find("\"workflow_id\"") {
1662        let remainder = &search[index + "\"workflow_id\"".len()..];
1663        let Some(colon_index) = remainder.find(':') else {
1664            break;
1665        };
1666
1667        let candidate = remainder[colon_index + 1..].trim_start();
1668        if let Some(rest) = candidate.strip_prefix('"') {
1669            if let Some(end_quote_index) = rest.find('"') {
1670                let workflow_id = rest[..end_quote_index].trim();
1671                if !workflow_id.is_empty() {
1672                    ids.push(workflow_id.to_string());
1673                }
1674                search = &rest[end_quote_index + 1..];
1675                continue;
1676            }
1677        }
1678
1679        search = &remainder[colon_index + 1..];
1680    }
1681
1682    ids
1683}
1684
1685fn normalize_workflow_lookup_key(value: &str) -> String {
1686    value
1687        .chars()
1688        .map(|ch| match ch {
1689            '-' => '_',
1690            _ => ch.to_ascii_lowercase(),
1691        })
1692        .collect()
1693}
1694
1695fn is_yaml_file(path: &Path) -> bool {
1696    matches!(
1697        path.extension().and_then(|ext| ext.to_str()),
1698        Some("yaml") | Some("yml")
1699    )
1700}
1701
1702fn prefixed_mermaid_id(prefix: &str, id: &str) -> String {
1703    sanitize_mermaid_id(format!("{}__{}", prefix, id).as_str())
1704}
1705
1706fn tool_declaration_name(tool: &YamlToolDeclaration) -> &str {
1707    match tool {
1708        YamlToolDeclaration::OpenAi(openai) => &openai.function.name,
1709        YamlToolDeclaration::Simplified(simple) => &simple.name,
1710    }
1711}
1712
1713pub fn yaml_workflow_to_ir(workflow: &YamlWorkflow) -> Result<WorkflowDefinition, YamlToIrError> {
1714    let known_ids: HashSet<&str> = workflow.nodes.iter().map(|n| n.id.as_str()).collect();
1715    if !known_ids.contains(workflow.entry_node.as_str()) {
1716        return Err(YamlToIrError::MissingEntry {
1717            entry_node: workflow.entry_node.clone(),
1718        });
1719    }
1720
1721    let mut outgoing: HashMap<&str, Vec<&str>> = HashMap::new();
1722    for edge in &workflow.edges {
1723        outgoing
1724            .entry(edge.from.as_str())
1725            .or_default()
1726            .push(edge.to.as_str());
1727    }
1728
1729    let mut nodes = Vec::with_capacity(workflow.nodes.len() + 1);
1730    nodes.push(Node {
1731        id: YAML_START_NODE_ID.to_string(),
1732        kind: NodeKind::Start {
1733            next: workflow.entry_node.clone(),
1734        },
1735    });
1736
1737    for node in &workflow.nodes {
1738        if let Some(llm) = node.node_type.llm_call.as_ref() {
1739            if node
1740                .config
1741                .as_ref()
1742                .and_then(|c| c.set_globals.as_ref())
1743                .is_some()
1744                || node
1745                    .config
1746                    .as_ref()
1747                    .and_then(|c| c.update_globals.as_ref())
1748                    .is_some()
1749            {
1750                return Err(YamlToIrError::UnsupportedNode {
1751                    node_id: node.id.clone(),
1752                    reason: "set_globals/update_globals are not represented in canonical IR llm nodes yet"
1753                        .to_string(),
1754                });
1755            }
1756
1757            if !llm.tools.is_empty() {
1758                return Err(YamlToIrError::UnsupportedNode {
1759                    node_id: node.id.clone(),
1760                    reason: "llm_call.tools are not represented in canonical IR llm nodes yet"
1761                        .to_string(),
1762                });
1763            }
1764
1765            let next = single_next_for_node(&outgoing, &node.id)?;
1766            nodes.push(Node {
1767                id: node.id.clone(),
1768                kind: NodeKind::Tool {
1769                    tool: YAML_LLM_TOOL_ID.to_string(),
1770                    input: json!({
1771                        "node_id": node.id,
1772                        "model": llm.model,
1773                        "prompt_template": node
1774                            .config
1775                            .as_ref()
1776                            .and_then(|c| c.prompt.clone())
1777                            .unwrap_or_default(),
1778                        "stream": llm.stream.unwrap_or(false),
1779                        "stream_json_as_text": llm.stream_json_as_text.unwrap_or(false),
1780                        "heal": llm.heal.unwrap_or(false),
1781                        "messages_path": llm.messages_path,
1782                        "append_prompt_as_user": llm.append_prompt_as_user.unwrap_or(true),
1783                        "output_schema": node
1784                            .config
1785                            .as_ref()
1786                            .and_then(|c| c.output_schema.clone())
1787                            .unwrap_or_else(default_llm_output_schema),
1788                    }),
1789                    next,
1790                },
1791            });
1792            continue;
1793        }
1794
1795        if let Some(worker) = node.node_type.custom_worker.as_ref() {
1796            if node
1797                .config
1798                .as_ref()
1799                .and_then(|c| c.set_globals.as_ref())
1800                .is_some()
1801                || node
1802                    .config
1803                    .as_ref()
1804                    .and_then(|c| c.update_globals.as_ref())
1805                    .is_some()
1806            {
1807                return Err(YamlToIrError::UnsupportedNode {
1808                    node_id: node.id.clone(),
1809                    reason: "set_globals/update_globals are not represented in canonical IR tool nodes yet"
1810                        .to_string(),
1811                });
1812            }
1813
1814            let next = single_next_for_node(&outgoing, &node.id)?;
1815            nodes.push(Node {
1816                id: node.id.clone(),
1817                kind: NodeKind::Tool {
1818                    tool: worker.handler.clone(),
1819                    input: node
1820                        .config
1821                        .as_ref()
1822                        .and_then(|c| c.payload.clone())
1823                        .unwrap_or_else(|| json!({})),
1824                    next,
1825                },
1826            });
1827            continue;
1828        }
1829
1830        if let Some(switch) = node.node_type.switch.as_ref() {
1831            nodes.push(Node {
1832                id: node.id.clone(),
1833                kind: NodeKind::Router {
1834                    routes: switch
1835                        .branches
1836                        .iter()
1837                        .map(|b| RouterRoute {
1838                            when: rewrite_yaml_condition_to_ir(&b.condition),
1839                            next: b.target.clone(),
1840                        })
1841                        .collect(),
1842                    default: switch.default.clone(),
1843                },
1844            });
1845            continue;
1846        }
1847
1848        return Err(YamlToIrError::UnsupportedNode {
1849            node_id: node.id.clone(),
1850            reason: "node_type must be llm_call, switch, or custom_worker".to_string(),
1851        });
1852    }
1853
1854    Ok(WorkflowDefinition {
1855        version: WORKFLOW_IR_V0.to_string(),
1856        name: workflow.id.clone(),
1857        nodes,
1858    })
1859}
1860
1861fn single_next_for_node(
1862    outgoing: &HashMap<&str, Vec<&str>>,
1863    node_id: &str,
1864) -> Result<Option<String>, YamlToIrError> {
1865    match outgoing.get(node_id) {
1866        None => Ok(None),
1867        Some(targets) if targets.len() == 1 => Ok(Some(targets[0].to_string())),
1868        Some(_) => Err(YamlToIrError::MultipleOutgoingEdge {
1869            node_id: node_id.to_string(),
1870        }),
1871    }
1872}
1873
1874fn rewrite_yaml_condition_to_ir(expr: &str) -> String {
1875    let rewritten = expr
1876        .replace("$.nodes.", "$.node_outputs.")
1877        .replace(".output.", ".");
1878    if let Some(prefix) = rewritten.strip_suffix(".output") {
1879        prefix.to_string()
1880    } else {
1881        rewritten
1882    }
1883}
1884
1885fn sanitize_mermaid_id(id: &str) -> String {
1886    let mut out = String::with_capacity(id.len() + 1);
1887    if id
1888        .chars()
1889        .next()
1890        .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_')
1891    {
1892        out.push_str(id);
1893    } else {
1894        out.push('n');
1895        out.push('_');
1896        out.push_str(id);
1897    }
1898    out.chars()
1899        .map(|ch| {
1900            if ch.is_ascii_alphanumeric() || ch == '_' {
1901                ch
1902            } else {
1903                '_'
1904            }
1905        })
1906        .collect()
1907}
1908
1909fn escape_mermaid_label(label: &str) -> String {
1910    label.replace('"', "\\\"")
1911}
1912
1913#[derive(Debug, Clone)]
1914pub struct YamlLlmExecutionRequest {
1915    pub node_id: String,
1916    pub is_terminal_node: bool,
1917    pub stream_json_as_text: bool,
1918    pub model: String,
1919    pub messages: Option<Vec<Message>>,
1920    pub append_prompt_as_user: bool,
1921    pub prompt: String,
1922    pub prompt_template: String,
1923    pub prompt_bindings: Vec<YamlTemplateBinding>,
1924    pub schema: Value,
1925    pub stream: bool,
1926    pub heal: bool,
1927    pub tools: Vec<YamlResolvedTool>,
1928    pub tool_choice: Option<ToolChoice>,
1929    pub max_tool_roundtrips: u8,
1930    pub tool_calls_global_key: Option<String>,
1931    pub tool_trace_mode: YamlToolTraceMode,
1932    pub execution_context: Value,
1933    pub email_text: String,
1934    pub trace_id: Option<String>,
1935    pub trace_context: Option<TraceContext>,
1936    pub tenant_context: YamlWorkflowTraceTenantContext,
1937    pub trace_sampled: bool,
1938}
1939
1940#[derive(Debug, Clone)]
1941pub struct YamlResolvedTool {
1942    pub definition: ToolDefinition,
1943    pub output_schema: Option<Value>,
1944}
1945
1946#[async_trait]
1947pub trait YamlWorkflowLlmExecutor: Send + Sync {
1948    async fn complete_structured(
1949        &self,
1950        request: YamlLlmExecutionRequest,
1951        event_sink: Option<&dyn YamlWorkflowEventSink>,
1952    ) -> Result<YamlLlmExecutionResult, String>;
1953}
1954
1955#[async_trait]
1956pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
1957    async fn execute(
1958        &self,
1959        handler: &str,
1960        payload: &Value,
1961        email_text: &str,
1962        context: &Value,
1963    ) -> Result<Value, String>;
1964}
1965
1966pub async fn run_workflow_yaml_file(
1967    workflow_path: &Path,
1968    workflow_input: &Value,
1969    executor: &dyn YamlWorkflowLlmExecutor,
1970) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1971    let contents =
1972        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1973            path: workflow_path.display().to_string(),
1974            source,
1975        })?;
1976
1977    let workflow: YamlWorkflow =
1978        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1979            path: workflow_path.display().to_string(),
1980            source,
1981        })?;
1982
1983    run_workflow_yaml(&workflow, workflow_input, executor).await
1984}
1985
1986pub async fn run_email_workflow_yaml_file(
1987    workflow_path: &Path,
1988    email_text: &str,
1989    executor: &dyn YamlWorkflowLlmExecutor,
1990) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1991    let workflow_input = json!({ "email_text": email_text });
1992    run_workflow_yaml_file(workflow_path, &workflow_input, executor).await
1993}
1994
1995pub async fn run_workflow_yaml_file_with_client(
1996    workflow_path: &Path,
1997    workflow_input: &Value,
1998    client: &SimpleAgentsClient,
1999) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2000    let contents =
2001        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
2002            path: workflow_path.display().to_string(),
2003            source,
2004        })?;
2005
2006    let workflow: YamlWorkflow =
2007        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
2008            path: workflow_path.display().to_string(),
2009            source,
2010        })?;
2011
2012    run_workflow_yaml_with_client(&workflow, workflow_input, client).await
2013}
2014
2015pub async fn run_email_workflow_yaml_file_with_client(
2016    workflow_path: &Path,
2017    email_text: &str,
2018    client: &SimpleAgentsClient,
2019) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2020    let workflow_input = json!({ "email_text": email_text });
2021    run_workflow_yaml_file_with_client(workflow_path, &workflow_input, client).await
2022}
2023
2024pub async fn run_workflow_yaml_with_client(
2025    workflow: &YamlWorkflow,
2026    workflow_input: &Value,
2027    client: &SimpleAgentsClient,
2028) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2029    run_workflow_yaml_with_client_and_custom_worker(workflow, workflow_input, client, None).await
2030}
2031
2032pub async fn run_email_workflow_yaml_with_client(
2033    workflow: &YamlWorkflow,
2034    email_text: &str,
2035    client: &SimpleAgentsClient,
2036) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2037    let workflow_input = json!({ "email_text": email_text });
2038    run_workflow_yaml_with_client(workflow, &workflow_input, client).await
2039}
2040
2041pub async fn run_workflow_yaml_file_with_client_and_custom_worker(
2042    workflow_path: &Path,
2043    workflow_input: &Value,
2044    client: &SimpleAgentsClient,
2045    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2046) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2047    let contents =
2048        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
2049            path: workflow_path.display().to_string(),
2050            source,
2051        })?;
2052
2053    let workflow: YamlWorkflow =
2054        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
2055            path: workflow_path.display().to_string(),
2056            source,
2057        })?;
2058
2059    run_workflow_yaml_with_client_and_custom_worker(
2060        &workflow,
2061        workflow_input,
2062        client,
2063        custom_worker,
2064    )
2065    .await
2066}
2067
2068pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker(
2069    workflow_path: &Path,
2070    email_text: &str,
2071    client: &SimpleAgentsClient,
2072    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2073) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2074    let workflow_input = json!({ "email_text": email_text });
2075    run_workflow_yaml_file_with_client_and_custom_worker(
2076        workflow_path,
2077        &workflow_input,
2078        client,
2079        custom_worker,
2080    )
2081    .await
2082}
2083
2084pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events(
2085    workflow_path: &Path,
2086    workflow_input: &Value,
2087    client: &SimpleAgentsClient,
2088    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2089    event_sink: Option<&dyn YamlWorkflowEventSink>,
2090) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2091    run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
2092        workflow_path,
2093        workflow_input,
2094        client,
2095        custom_worker,
2096        event_sink,
2097        &YamlWorkflowRunOptions::default(),
2098    )
2099    .await
2100}
2101
2102pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
2103    workflow_path: &Path,
2104    workflow_input: &Value,
2105    client: &SimpleAgentsClient,
2106    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2107    event_sink: Option<&dyn YamlWorkflowEventSink>,
2108    options: &YamlWorkflowRunOptions,
2109) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2110    let contents =
2111        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
2112            path: workflow_path.display().to_string(),
2113            source,
2114        })?;
2115
2116    let workflow: YamlWorkflow =
2117        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
2118            path: workflow_path.display().to_string(),
2119            source,
2120        })?;
2121
2122    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2123        &workflow,
2124        workflow_input,
2125        client,
2126        custom_worker,
2127        event_sink,
2128        options,
2129    )
2130    .await
2131}
2132
2133pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker_and_events(
2134    workflow_path: &Path,
2135    email_text: &str,
2136    client: &SimpleAgentsClient,
2137    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2138    event_sink: Option<&dyn YamlWorkflowEventSink>,
2139) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2140    let workflow_input = json!({ "email_text": email_text });
2141    run_workflow_yaml_file_with_client_and_custom_worker_and_events(
2142        workflow_path,
2143        &workflow_input,
2144        client,
2145        custom_worker,
2146        event_sink,
2147    )
2148    .await
2149}
2150
2151pub async fn run_workflow_yaml_with_client_and_custom_worker(
2152    workflow: &YamlWorkflow,
2153    workflow_input: &Value,
2154    client: &SimpleAgentsClient,
2155    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2156) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2157    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2158        workflow,
2159        workflow_input,
2160        client,
2161        custom_worker,
2162        None,
2163        &YamlWorkflowRunOptions::default(),
2164    )
2165    .await
2166}
2167
2168pub async fn run_email_workflow_yaml_with_client_and_custom_worker(
2169    workflow: &YamlWorkflow,
2170    email_text: &str,
2171    client: &SimpleAgentsClient,
2172    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2173) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2174    let workflow_input = json!({ "email_text": email_text });
2175    run_workflow_yaml_with_client_and_custom_worker(
2176        workflow,
2177        &workflow_input,
2178        client,
2179        custom_worker,
2180    )
2181    .await
2182}
2183
2184pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events(
2185    workflow: &YamlWorkflow,
2186    workflow_input: &Value,
2187    client: &SimpleAgentsClient,
2188    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2189    event_sink: Option<&dyn YamlWorkflowEventSink>,
2190) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2191    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2192        workflow,
2193        workflow_input,
2194        client,
2195        custom_worker,
2196        event_sink,
2197        &YamlWorkflowRunOptions::default(),
2198    )
2199    .await
2200}
2201
2202pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2203    workflow: &YamlWorkflow,
2204    workflow_input: &Value,
2205    client: &SimpleAgentsClient,
2206    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2207    event_sink: Option<&dyn YamlWorkflowEventSink>,
2208    options: &YamlWorkflowRunOptions,
2209) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2210    struct BorrowedClientExecutor<'a> {
2211        client: &'a SimpleAgentsClient,
2212        custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
2213        run_options: YamlWorkflowRunOptions,
2214    }
2215
2216    #[async_trait]
2217    impl<'a> YamlWorkflowLlmExecutor for BorrowedClientExecutor<'a> {
2218        async fn complete_structured(
2219            &self,
2220            request: YamlLlmExecutionRequest,
2221            event_sink: Option<&dyn YamlWorkflowEventSink>,
2222        ) -> Result<YamlLlmExecutionResult, String> {
2223            let expects_object = schema_expects_object(&request.schema);
2224            let messages = if let Some(mut history) = request.messages.clone() {
2225                if request.append_prompt_as_user && !request.prompt.trim().is_empty() {
2226                    history.push(Message::user(&request.prompt));
2227                }
2228                history
2229            } else {
2230                vec![
2231                    Message::system("You execute workflow classification steps."),
2232                    Message::user(&request.prompt),
2233                ]
2234            };
2235
2236            if !request.tools.is_empty() {
2237                let mut tool_traces: Vec<YamlToolCallTrace> = Vec::new();
2238                let mut conversation = messages;
2239                let mut usage_total: Option<YamlLlmTokenUsage> = None;
2240
2241                for roundtrip in 0..=request.max_tool_roundtrips {
2242                    let mut builder = CompletionRequest::builder()
2243                        .model(&request.model)
2244                        .messages(conversation.clone())
2245                        .tools(request.tools.iter().map(|t| t.definition.clone()).collect());
2246
2247                    if request.heal && expects_object {
2248                        builder = builder.json_schema("workflow_step", request.schema.clone());
2249                    }
2250
2251                    if let Some(choice) = request.tool_choice.clone() {
2252                        builder = builder.tool_choice(choice);
2253                    }
2254
2255                    if request.stream {
2256                        builder = builder.stream(true);
2257                    }
2258
2259                    let completion_request = builder
2260                        .build()
2261                        .map_err(|error| format!("failed to build completion request: {error}"))?;
2262
2263                    let outcome = self
2264                        .client
2265                        .complete(&completion_request, CompletionOptions::default())
2266                        .await
2267                        .map_err(|error| error.to_string())?;
2268
2269                    let mut streamed_tool_calls: Option<Vec<ToolCall>> = None;
2270                    let mut streamed_content = String::new();
2271                    let mut finish_reason = FinishReason::Stop;
2272
2273                    match outcome {
2274                        CompletionOutcome::Response(response) => {
2275                            if let Some(usage) = usage_total.as_mut() {
2276                                usage.prompt_tokens += response.usage.prompt_tokens;
2277                                usage.completion_tokens += response.usage.completion_tokens;
2278                                usage.total_tokens += response.usage.total_tokens;
2279                                if let Some(reasoning_tokens) = response.usage.reasoning_tokens {
2280                                    usage.reasoning_tokens = Some(
2281                                        usage.reasoning_tokens.unwrap_or(0) + reasoning_tokens,
2282                                    );
2283                                }
2284                            } else {
2285                                usage_total = Some(YamlLlmTokenUsage {
2286                                    prompt_tokens: response.usage.prompt_tokens,
2287                                    completion_tokens: response.usage.completion_tokens,
2288                                    total_tokens: response.usage.total_tokens,
2289                                    reasoning_tokens: response.usage.reasoning_tokens,
2290                                });
2291                            }
2292
2293                            let choice = response
2294                                .choices
2295                                .first()
2296                                .ok_or_else(|| "completion returned no choices".to_string())?;
2297                            streamed_content = choice.message.content.clone();
2298                            streamed_tool_calls = choice.message.tool_calls.clone();
2299                            finish_reason = choice.finish_reason;
2300                        }
2301                        CompletionOutcome::HealedJson(healed) => {
2302                            let response = healed.response;
2303                            if let Some(usage) = usage_total.as_mut() {
2304                                usage.prompt_tokens += response.usage.prompt_tokens;
2305                                usage.completion_tokens += response.usage.completion_tokens;
2306                                usage.total_tokens += response.usage.total_tokens;
2307                                if let Some(reasoning_tokens) = response.usage.reasoning_tokens {
2308                                    usage.reasoning_tokens = Some(
2309                                        usage.reasoning_tokens.unwrap_or(0) + reasoning_tokens,
2310                                    );
2311                                }
2312                            } else {
2313                                usage_total = Some(YamlLlmTokenUsage {
2314                                    prompt_tokens: response.usage.prompt_tokens,
2315                                    completion_tokens: response.usage.completion_tokens,
2316                                    total_tokens: response.usage.total_tokens,
2317                                    reasoning_tokens: response.usage.reasoning_tokens,
2318                                });
2319                            }
2320
2321                            let choice = response
2322                                .choices
2323                                .first()
2324                                .ok_or_else(|| "completion returned no choices".to_string())?;
2325                            streamed_content = choice.message.content.clone();
2326                            streamed_tool_calls = choice.message.tool_calls.clone();
2327                            finish_reason = choice.finish_reason;
2328                        }
2329                        CompletionOutcome::CoercedSchema(coerced) => {
2330                            let response = coerced.response;
2331                            if let Some(usage) = usage_total.as_mut() {
2332                                usage.prompt_tokens += response.usage.prompt_tokens;
2333                                usage.completion_tokens += response.usage.completion_tokens;
2334                                usage.total_tokens += response.usage.total_tokens;
2335                                if let Some(reasoning_tokens) = response.usage.reasoning_tokens {
2336                                    usage.reasoning_tokens = Some(
2337                                        usage.reasoning_tokens.unwrap_or(0) + reasoning_tokens,
2338                                    );
2339                                }
2340                            } else {
2341                                usage_total = Some(YamlLlmTokenUsage {
2342                                    prompt_tokens: response.usage.prompt_tokens,
2343                                    completion_tokens: response.usage.completion_tokens,
2344                                    total_tokens: response.usage.total_tokens,
2345                                    reasoning_tokens: response.usage.reasoning_tokens,
2346                                });
2347                            }
2348
2349                            let choice = response
2350                                .choices
2351                                .first()
2352                                .ok_or_else(|| "completion returned no choices".to_string())?;
2353                            streamed_content = choice.message.content.clone();
2354                            streamed_tool_calls = choice.message.tool_calls.clone();
2355                            finish_reason = choice.finish_reason;
2356                        }
2357                        CompletionOutcome::Stream(mut stream) => {
2358                            let mut final_stream_usage: Option<simple_agent_type::response::Usage> =
2359                                None;
2360                            let mut delta_filter = StructuredJsonDeltaFilter::default();
2361                            let include_raw_debug = include_raw_stream_debug_events();
2362                            let mut json_text_formatter = if request.stream_json_as_text {
2363                                Some(StreamJsonAsTextFormatter::default())
2364                            } else {
2365                                None
2366                            };
2367                            let mut tool_calls_by_index: HashMap<u32, ToolCall> = HashMap::new();
2368
2369                            while let Some(chunk_result) = stream.next().await {
2370                                if event_sink_is_cancelled(event_sink) {
2371                                    return Err(workflow_event_sink_cancelled_message().to_string());
2372                                }
2373
2374                                let chunk = chunk_result.map_err(|error| error.to_string())?;
2375                                if let Some(usage) = chunk.usage {
2376                                    final_stream_usage = Some(usage);
2377                                }
2378
2379                                if let Some(choice) = chunk.choices.first() {
2380                                    if let Some(chunk_finish_reason) = choice.finish_reason {
2381                                        finish_reason = chunk_finish_reason;
2382                                    }
2383
2384                                    if include_raw_debug {
2385                                        if let Some(reasoning_delta) =
2386                                            choice.delta.reasoning_content.as_ref()
2387                                        {
2388                                            if let Some(sink) = event_sink {
2389                                                sink.emit(&YamlWorkflowEvent {
2390                                                    event_type: "node_stream_thinking_delta"
2391                                                        .to_string(),
2392                                                    node_id: Some(request.node_id.clone()),
2393                                                    step_id: Some(request.node_id.clone()),
2394                                                    node_kind: Some("llm_call".to_string()),
2395                                                    streamable: Some(true),
2396                                                    message: None,
2397                                                    delta: Some(reasoning_delta.clone()),
2398                                                    token_kind: Some(
2399                                                        YamlWorkflowTokenKind::Thinking,
2400                                                    ),
2401                                                    is_terminal_node_token: Some(
2402                                                        request.is_terminal_node,
2403                                                    ),
2404                                                    elapsed_ms: None,
2405                                                    metadata: None,
2406                                                });
2407                                            }
2408                                        }
2409                                    }
2410
2411                                    if let Some(delta) = choice.delta.content.clone() {
2412                                        streamed_content.push_str(delta.as_str());
2413                                        let (output_delta, thinking_delta) = if expects_object {
2414                                            delta_filter.split(delta.as_str())
2415                                        } else {
2416                                            (Some(delta.clone()), None)
2417                                        };
2418                                        let rendered_output_delta = if let Some(output_chunk) =
2419                                            output_delta
2420                                        {
2421                                            if let Some(formatter) = json_text_formatter.as_mut() {
2422                                                formatter.push(output_chunk.as_str());
2423                                                formatter.emit_if_ready(delta_filter.completed())
2424                                            } else {
2425                                                Some(output_chunk)
2426                                            }
2427                                        } else {
2428                                            None
2429                                        };
2430
2431                                        if include_raw_debug {
2432                                            if let Some(sink) = event_sink {
2433                                                if let Some(raw_thinking_delta) =
2434                                                    thinking_delta.as_ref()
2435                                                {
2436                                                    sink.emit(&YamlWorkflowEvent {
2437                                                        event_type: "node_stream_thinking_delta"
2438                                                            .to_string(),
2439                                                        node_id: Some(request.node_id.clone()),
2440                                                        step_id: Some(request.node_id.clone()),
2441                                                        node_kind: Some("llm_call".to_string()),
2442                                                        streamable: Some(true),
2443                                                        message: None,
2444                                                        delta: Some(raw_thinking_delta.clone()),
2445                                                        token_kind: Some(
2446                                                            YamlWorkflowTokenKind::Thinking,
2447                                                        ),
2448                                                        is_terminal_node_token: Some(
2449                                                            request.is_terminal_node,
2450                                                        ),
2451                                                        elapsed_ms: None,
2452                                                        metadata: None,
2453                                                    });
2454                                                }
2455                                                if let Some(raw_output_delta) =
2456                                                    rendered_output_delta.as_ref()
2457                                                {
2458                                                    sink.emit(&YamlWorkflowEvent {
2459                                                        event_type: "node_stream_output_delta"
2460                                                            .to_string(),
2461                                                        node_id: Some(request.node_id.clone()),
2462                                                        step_id: Some(request.node_id.clone()),
2463                                                        node_kind: Some("llm_call".to_string()),
2464                                                        streamable: Some(true),
2465                                                        message: None,
2466                                                        delta: Some(raw_output_delta.clone()),
2467                                                        token_kind: Some(
2468                                                            YamlWorkflowTokenKind::Output,
2469                                                        ),
2470                                                        is_terminal_node_token: Some(
2471                                                            request.is_terminal_node,
2472                                                        ),
2473                                                        elapsed_ms: None,
2474                                                        metadata: None,
2475                                                    });
2476                                                }
2477                                            }
2478                                        }
2479
2480                                        if let Some(filtered_delta) = rendered_output_delta {
2481                                            if let Some(sink) = event_sink {
2482                                                sink.emit(&YamlWorkflowEvent {
2483                                                    event_type: "node_stream_delta".to_string(),
2484                                                    node_id: Some(request.node_id.clone()),
2485                                                    step_id: Some(request.node_id.clone()),
2486                                                    node_kind: Some("llm_call".to_string()),
2487                                                    streamable: Some(true),
2488                                                    message: None,
2489                                                    delta: Some(filtered_delta),
2490                                                    token_kind: Some(YamlWorkflowTokenKind::Output),
2491                                                    is_terminal_node_token: Some(
2492                                                        request.is_terminal_node,
2493                                                    ),
2494                                                    elapsed_ms: None,
2495                                                    metadata: None,
2496                                                });
2497                                            }
2498                                        }
2499                                    }
2500
2501                                    if let Some(tool_call_deltas) = choice.delta.tool_calls.as_ref()
2502                                    {
2503                                        for tool_call_delta in tool_call_deltas {
2504                                            let entry = tool_calls_by_index
2505                                                .entry(tool_call_delta.index)
2506                                                .or_insert_with(|| ToolCall {
2507                                                    id: tool_call_delta.id.clone().unwrap_or_else(
2508                                                        || {
2509                                                            format!(
2510                                                                "tool_call_{}",
2511                                                                tool_call_delta.index
2512                                                            )
2513                                                        },
2514                                                    ),
2515                                                    tool_type: ToolType::Function,
2516                                                    function:
2517                                                        simple_agent_type::tool::ToolCallFunction {
2518                                                            name: String::new(),
2519                                                            arguments: String::new(),
2520                                                        },
2521                                                });
2522
2523                                            if let Some(id) = tool_call_delta.id.as_ref() {
2524                                                entry.id = id.clone();
2525                                            }
2526                                            if let Some(tool_type) = tool_call_delta.tool_type {
2527                                                entry.tool_type = tool_type;
2528                                            }
2529                                            if let Some(function_delta) =
2530                                                tool_call_delta.function.as_ref()
2531                                            {
2532                                                if let Some(name) = function_delta.name.as_ref() {
2533                                                    entry.function.name = name.clone();
2534                                                }
2535                                                if let Some(arguments) =
2536                                                    function_delta.arguments.as_ref()
2537                                                {
2538                                                    entry.function.arguments.push_str(arguments);
2539                                                }
2540                                            }
2541                                        }
2542                                    }
2543                                }
2544
2545                                if event_sink_is_cancelled(event_sink) {
2546                                    return Err(workflow_event_sink_cancelled_message().to_string());
2547                                }
2548                            }
2549
2550                            if let Some(usage) = final_stream_usage {
2551                                if let Some(total) = usage_total.as_mut() {
2552                                    total.prompt_tokens += usage.prompt_tokens;
2553                                    total.completion_tokens += usage.completion_tokens;
2554                                    total.total_tokens += usage.total_tokens;
2555                                    if let Some(reasoning_tokens) = usage.reasoning_tokens {
2556                                        total.reasoning_tokens = Some(
2557                                            total.reasoning_tokens.unwrap_or(0) + reasoning_tokens,
2558                                        );
2559                                    }
2560                                } else {
2561                                    usage_total = Some(YamlLlmTokenUsage {
2562                                        prompt_tokens: usage.prompt_tokens,
2563                                        completion_tokens: usage.completion_tokens,
2564                                        total_tokens: usage.total_tokens,
2565                                        reasoning_tokens: usage.reasoning_tokens,
2566                                    });
2567                                }
2568                            }
2569
2570                            let mut ordered_tool_calls =
2571                                tool_calls_by_index.into_iter().collect::<Vec<_>>();
2572                            ordered_tool_calls.sort_by_key(|(index, _)| *index);
2573                            if !ordered_tool_calls.is_empty() {
2574                                streamed_tool_calls = Some(
2575                                    ordered_tool_calls
2576                                        .into_iter()
2577                                        .map(|(_, tool_call)| tool_call)
2578                                        .collect::<Vec<_>>(),
2579                                );
2580                            }
2581                        }
2582                    }
2583
2584                    let has_tool_calls = streamed_tool_calls
2585                        .as_ref()
2586                        .is_some_and(|calls| !calls.is_empty());
2587                    if finish_reason != FinishReason::ToolCalls && !has_tool_calls {
2588                        let payload = if expects_object {
2589                            parse_streamed_structured_payload(
2590                                streamed_content.as_str(),
2591                                request.heal,
2592                            )
2593                            .map_err(|error| {
2594                                format!("failed to parse structured completion JSON: {error}")
2595                            })?
2596                            .payload
2597                        } else {
2598                            Value::String(streamed_content.clone())
2599                        };
2600                        return Ok(YamlLlmExecutionResult {
2601                            payload,
2602                            usage: usage_total,
2603                            ttft_ms: None,
2604                            tool_calls: tool_traces,
2605                        });
2606                    }
2607
2608                    if roundtrip >= request.max_tool_roundtrips {
2609                        return Err(format!(
2610                            "tool call roundtrip limit reached for node '{}' (max={})",
2611                            request.node_id, request.max_tool_roundtrips
2612                        ));
2613                    }
2614
2615                    let tool_calls: Vec<ToolCall> = streamed_tool_calls.ok_or_else(|| {
2616                        "finish_reason=tool_calls but no tool calls found".to_string()
2617                    })?;
2618                    if tool_calls
2619                        .iter()
2620                        .any(|tool_call| tool_call.function.name.trim().is_empty())
2621                    {
2622                        return Err("streamed tool call missing function name".to_string());
2623                    }
2624
2625                    let assistant_tool_message =
2626                        Message::assistant(&streamed_content).with_tool_calls(tool_calls.clone());
2627                    conversation.push(assistant_tool_message);
2628
2629                    for tool_call in tool_calls {
2630                        let tool_call_id = tool_call.id.clone();
2631                        let tool_name = tool_call.function.name.clone();
2632                        let tool_started = Instant::now();
2633                        let arguments: Value = serde_json::from_str(&tool_call.function.arguments)
2634                            .map_err(|error| {
2635                                format!(
2636                                    "tool '{}' arguments must be valid JSON: {}",
2637                                    tool_name, error
2638                                )
2639                            })?;
2640                        let mut tool_span_context: Option<TraceContext> = None;
2641                        let mut tool_span = if request.trace_sampled {
2642                            let (span_context, mut span) = workflow_tracer().start_span(
2643                                "workflow.tool.execute",
2644                                SpanKind::Node,
2645                                request.trace_context.as_ref(),
2646                            );
2647                            tool_span_context = Some(span_context);
2648                            apply_trace_identity_attributes(
2649                                span.as_mut(),
2650                                request.trace_id.as_deref(),
2651                            );
2652                            apply_trace_tenant_attributes_from_tenant(
2653                                span.as_mut(),
2654                                &request.tenant_context,
2655                            );
2656                            span.set_attribute("node_id", request.node_id.as_str());
2657                            span.set_attribute("node_kind", "llm_call");
2658                            span.set_attribute("tool_name", tool_name.as_str());
2659                            span.set_attribute("tool_call_id", tool_call_id.as_str());
2660                            let args_for_span =
2661                                payload_for_tool_trace(request.tool_trace_mode, &arguments)
2662                                    .to_string();
2663                            span.set_attribute("tool_arguments", args_for_span.as_str());
2664                            Some(span)
2665                        } else {
2666                            None
2667                        };
2668
2669                        if request.tool_trace_mode != YamlToolTraceMode::Off {
2670                            if let Some(sink) = event_sink {
2671                                sink.emit(&YamlWorkflowEvent {
2672                                    event_type: "node_tool_call_requested".to_string(),
2673                                    node_id: Some(request.node_id.clone()),
2674                                    step_id: Some(request.node_id.clone()),
2675                                    node_kind: Some("llm_call".to_string()),
2676                                    streamable: Some(false),
2677                                    message: Some(format!(
2678                                        "tool call requested: {}",
2679                                        tool_name
2680                                    )),
2681                                    delta: None,
2682                                    token_kind: None,
2683                                    is_terminal_node_token: None,
2684                                    elapsed_ms: None,
2685                                    metadata: Some(json!({
2686                                        "tool_call_id": tool_call_id.clone(),
2687                                        "tool_name": tool_name.clone(),
2688                                        "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
2689                                    })),
2690                                });
2691                            }
2692                        }
2693
2694                        let Some(tool_config) = request
2695                            .tools
2696                            .iter()
2697                            .find(|tool| tool.definition.function.name == tool_name)
2698                        else {
2699                            return Err(format!("model requested unknown tool '{}'", tool_name));
2700                        };
2701
2702                        let tool_output_result = if tool_name == "run_workflow_graph" {
2703                            execute_subworkflow_tool_call(
2704                                &arguments,
2705                                &request.execution_context,
2706                                self.client,
2707                                self.custom_worker,
2708                                &self.run_options,
2709                                tool_span_context.as_ref(),
2710                                request.trace_id.as_deref(),
2711                            )
2712                            .await
2713                        } else if let Some(custom_worker) = self.custom_worker {
2714                            custom_worker
2715                                .execute(
2716                                    tool_name.as_str(),
2717                                    &arguments,
2718                                    request.email_text.as_str(),
2719                                    &request.execution_context,
2720                                )
2721                                .await
2722                        } else {
2723                            Err(format!(
2724                                "tool '{}' requested but no custom worker executor is configured",
2725                                tool_name
2726                            ))
2727                        };
2728
2729                        let tool_output = match tool_output_result {
2730                            Ok(output) => output,
2731                            Err(message) => {
2732                                let elapsed_ms = tool_started.elapsed().as_millis();
2733                                if let Some(span) = tool_span.as_mut() {
2734                                    span.add_event("workflow.tool.execute.error");
2735                                    span.set_attribute("tool_status", "error");
2736                                    span.set_attribute("tool_error", message.as_str());
2737                                    span.set_attribute(
2738                                        "elapsed_ms",
2739                                        elapsed_ms.to_string().as_str(),
2740                                    );
2741                                }
2742                                if request.tool_trace_mode != YamlToolTraceMode::Off {
2743                                    if let Some(sink) = event_sink {
2744                                        sink.emit(&YamlWorkflowEvent {
2745                                            event_type: "node_tool_call_failed".to_string(),
2746                                            node_id: Some(request.node_id.clone()),
2747                                            step_id: Some(request.node_id.clone()),
2748                                            node_kind: Some("llm_call".to_string()),
2749                                            streamable: Some(false),
2750                                            message: Some(message.clone()),
2751                                            delta: None,
2752                                            token_kind: None,
2753                                            is_terminal_node_token: None,
2754                                            elapsed_ms: Some(elapsed_ms),
2755                                            metadata: Some(json!({
2756                                                "tool_call_id": tool_call_id.clone(),
2757                                                "tool_name": tool_name.clone(),
2758                                            })),
2759                                        });
2760                                    }
2761                                }
2762                                tool_traces.push(YamlToolCallTrace {
2763                                    id: tool_call_id.clone(),
2764                                    name: tool_name.clone(),
2765                                    arguments,
2766                                    output: None,
2767                                    status: "error".to_string(),
2768                                    elapsed_ms,
2769                                    error: Some(message.clone()),
2770                                });
2771                                if let Some(span) = tool_span.take() {
2772                                    span.end();
2773                                }
2774                                return Err(format!("tool '{}' failed: {}", tool_name, message));
2775                            }
2776                        };
2777
2778                        if let Some(output_schema) = tool_config.output_schema.as_ref() {
2779                            validate_schema_instance(output_schema, &tool_output).map_err(
2780                                |message| {
2781                                    format!(
2782                                        "tool '{}' output failed schema validation: {}",
2783                                        tool_name, message
2784                                    )
2785                                },
2786                            )?;
2787                        }
2788
2789                        let elapsed_ms = tool_started.elapsed().as_millis();
2790                        if let Some(span) = tool_span.as_mut() {
2791                            span.add_event("workflow.tool.execute.completed");
2792                            span.set_attribute("tool_status", "ok");
2793                            span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
2794                            let output_for_span =
2795                                payload_for_tool_trace(request.tool_trace_mode, &tool_output)
2796                                    .to_string();
2797                            span.set_attribute("tool_output", output_for_span.as_str());
2798                        }
2799                        if request.tool_trace_mode != YamlToolTraceMode::Off {
2800                            if let Some(sink) = event_sink {
2801                                sink.emit(&YamlWorkflowEvent {
2802                                    event_type: "node_tool_call_completed".to_string(),
2803                                    node_id: Some(request.node_id.clone()),
2804                                    step_id: Some(request.node_id.clone()),
2805                                    node_kind: Some("llm_call".to_string()),
2806                                    streamable: Some(false),
2807                                    message: Some(format!(
2808                                        "tool call completed: {}",
2809                                        tool_name
2810                                    )),
2811                                    delta: None,
2812                                    token_kind: None,
2813                                    is_terminal_node_token: None,
2814                                    elapsed_ms: Some(elapsed_ms),
2815                                    metadata: Some(json!({
2816                                        "tool_call_id": tool_call_id.clone(),
2817                                        "tool_name": tool_name.clone(),
2818                                        "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
2819                                        "output": payload_for_tool_trace(request.tool_trace_mode, &tool_output),
2820                                    })),
2821                                });
2822                            }
2823                        }
2824
2825                        tool_traces.push(YamlToolCallTrace {
2826                            id: tool_call_id.clone(),
2827                            name: tool_name.clone(),
2828                            arguments: arguments.clone(),
2829                            output: Some(tool_output.clone()),
2830                            status: "ok".to_string(),
2831                            elapsed_ms,
2832                            error: None,
2833                        });
2834
2835                        conversation.push(Message::tool(
2836                            serde_json::to_string(&tool_output).map_err(|error| {
2837                                format!("failed to serialize tool output: {error}")
2838                            })?,
2839                            tool_call_id,
2840                        ));
2841                        if let Some(span) = tool_span.take() {
2842                            span.end();
2843                        }
2844                    }
2845
2846                    if request.tool_trace_mode != YamlToolTraceMode::Off {
2847                        if let Some(sink) = event_sink {
2848                            sink.emit(&YamlWorkflowEvent {
2849                                event_type: "node_tool_roundtrip_completed".to_string(),
2850                                node_id: Some(request.node_id.clone()),
2851                                step_id: Some(request.node_id.clone()),
2852                                node_kind: Some("llm_call".to_string()),
2853                                streamable: Some(false),
2854                                message: Some(format!(
2855                                    "tool roundtrip {} completed",
2856                                    roundtrip + 1
2857                                )),
2858                                delta: None,
2859                                token_kind: None,
2860                                is_terminal_node_token: None,
2861                                elapsed_ms: None,
2862                                metadata: Some(json!({
2863                                    "roundtrip": roundtrip + 1,
2864                                    "max_tool_roundtrips": request.max_tool_roundtrips,
2865                                })),
2866                            });
2867                        }
2868                    }
2869                }
2870
2871                return Err(format!(
2872                    "tool-enabled llm_call '{}' exhausted loop without final payload",
2873                    request.node_id
2874                ));
2875            }
2876
2877            let mut builder = CompletionRequest::builder()
2878                .model(&request.model)
2879                .messages(messages);
2880
2881            if request.heal && !request.stream && expects_object {
2882                builder = builder.json_schema("workflow_step", request.schema.clone());
2883            }
2884
2885            if request.stream {
2886                builder = builder.stream(true);
2887            }
2888
2889            let completion_request = builder
2890                .build()
2891                .map_err(|error| format!("failed to build completion request: {error}"))?;
2892
2893            let completion_options = if request.heal && !request.stream && expects_object {
2894                CompletionOptions {
2895                    mode: CompletionMode::HealedJson,
2896                }
2897            } else {
2898                CompletionOptions::default()
2899            };
2900
2901            let outcome = self
2902                .client
2903                .complete(&completion_request, completion_options)
2904                .await
2905                .map_err(|error| error.to_string())?;
2906
2907            match outcome {
2908                CompletionOutcome::Stream(mut stream) => {
2909                    let mut aggregated = String::new();
2910                    let mut final_stream_usage: Option<simple_agent_type::response::Usage> = None;
2911                    let stream_started = Instant::now();
2912                    let mut ttft_ms: Option<u128> = None;
2913                    let mut delta_filter = StructuredJsonDeltaFilter::default();
2914                    let include_raw_debug = include_raw_stream_debug_events();
2915                    let mut json_text_formatter = if request.stream_json_as_text {
2916                        Some(StreamJsonAsTextFormatter::default())
2917                    } else {
2918                        None
2919                    };
2920                    while let Some(chunk_result) = stream.next().await {
2921                        if event_sink_is_cancelled(event_sink) {
2922                            return Err(workflow_event_sink_cancelled_message().to_string());
2923                        }
2924                        let chunk = chunk_result.map_err(|error| error.to_string())?;
2925                        if let Some(usage) = chunk.usage {
2926                            final_stream_usage = Some(usage);
2927                        }
2928                        if let Some(choice) = chunk.choices.first() {
2929                            if ttft_ms.is_none()
2930                                && (choice
2931                                    .delta
2932                                    .content
2933                                    .as_ref()
2934                                    .is_some_and(|delta| !delta.is_empty())
2935                                    || choice
2936                                        .delta
2937                                        .reasoning_content
2938                                        .as_ref()
2939                                        .is_some_and(|delta| !delta.is_empty()))
2940                            {
2941                                ttft_ms = Some(stream_started.elapsed().as_millis());
2942                            }
2943                            if include_raw_debug {
2944                                if let Some(reasoning_delta) =
2945                                    choice.delta.reasoning_content.as_ref()
2946                                {
2947                                    if let Some(sink) = event_sink {
2948                                        sink.emit(&YamlWorkflowEvent {
2949                                            event_type: "node_stream_thinking_delta".to_string(),
2950                                            node_id: Some(request.node_id.clone()),
2951                                            step_id: Some(request.node_id.clone()),
2952                                            node_kind: Some("llm_call".to_string()),
2953                                            streamable: Some(true),
2954                                            message: None,
2955                                            delta: Some(reasoning_delta.clone()),
2956                                            token_kind: Some(YamlWorkflowTokenKind::Thinking),
2957                                            is_terminal_node_token: Some(request.is_terminal_node),
2958                                            elapsed_ms: None,
2959                                            metadata: None,
2960                                        });
2961                                    }
2962                                }
2963                            }
2964                            if let Some(delta) = choice.delta.content.clone() {
2965                                aggregated.push_str(delta.as_str());
2966                                let (output_delta, thinking_delta) = if expects_object {
2967                                    delta_filter.split(delta.as_str())
2968                                } else {
2969                                    (Some(delta.clone()), None)
2970                                };
2971                                let rendered_output_delta = if let Some(output_chunk) = output_delta
2972                                {
2973                                    if let Some(formatter) = json_text_formatter.as_mut() {
2974                                        formatter.push(output_chunk.as_str());
2975                                        formatter.emit_if_ready(delta_filter.completed())
2976                                    } else {
2977                                        Some(output_chunk)
2978                                    }
2979                                } else {
2980                                    None
2981                                };
2982                                if include_raw_debug {
2983                                    if let Some(sink) = event_sink {
2984                                        if let Some(raw_thinking_delta) = thinking_delta.as_ref() {
2985                                            sink.emit(&YamlWorkflowEvent {
2986                                                event_type: "node_stream_thinking_delta"
2987                                                    .to_string(),
2988                                                node_id: Some(request.node_id.clone()),
2989                                                step_id: Some(request.node_id.clone()),
2990                                                node_kind: Some("llm_call".to_string()),
2991                                                streamable: Some(true),
2992                                                message: None,
2993                                                delta: Some(raw_thinking_delta.clone()),
2994                                                token_kind: Some(YamlWorkflowTokenKind::Thinking),
2995                                                is_terminal_node_token: Some(
2996                                                    request.is_terminal_node,
2997                                                ),
2998                                                elapsed_ms: None,
2999                                                metadata: None,
3000                                            });
3001                                        }
3002                                        if let Some(raw_output_delta) =
3003                                            rendered_output_delta.as_ref()
3004                                        {
3005                                            sink.emit(&YamlWorkflowEvent {
3006                                                event_type: "node_stream_output_delta".to_string(),
3007                                                node_id: Some(request.node_id.clone()),
3008                                                step_id: Some(request.node_id.clone()),
3009                                                node_kind: Some("llm_call".to_string()),
3010                                                streamable: Some(true),
3011                                                message: None,
3012                                                delta: Some(raw_output_delta.clone()),
3013                                                token_kind: Some(YamlWorkflowTokenKind::Output),
3014                                                is_terminal_node_token: Some(
3015                                                    request.is_terminal_node,
3016                                                ),
3017                                                elapsed_ms: None,
3018                                                metadata: None,
3019                                            });
3020                                        }
3021                                    }
3022                                }
3023                                if let Some(filtered_delta) = rendered_output_delta {
3024                                    if let Some(sink) = event_sink {
3025                                        sink.emit(&YamlWorkflowEvent {
3026                                            event_type: "node_stream_delta".to_string(),
3027                                            node_id: Some(request.node_id.clone()),
3028                                            step_id: Some(request.node_id.clone()),
3029                                            node_kind: Some("llm_call".to_string()),
3030                                            streamable: Some(true),
3031                                            message: None,
3032                                            delta: Some(filtered_delta),
3033                                            token_kind: Some(YamlWorkflowTokenKind::Output),
3034                                            is_terminal_node_token: Some(request.is_terminal_node),
3035                                            elapsed_ms: None,
3036                                            metadata: None,
3037                                        });
3038                                    }
3039                                }
3040                            }
3041                        }
3042
3043                        if event_sink_is_cancelled(event_sink) {
3044                            return Err(workflow_event_sink_cancelled_message().to_string());
3045                        }
3046                    }
3047
3048                    let payload = if expects_object {
3049                        let resolved =
3050                            parse_streamed_structured_payload(aggregated.as_str(), request.heal)?;
3051                        if let Some(confidence) = resolved.heal_confidence {
3052                            if let Some(sink) = event_sink {
3053                                sink.emit(&YamlWorkflowEvent {
3054                                    event_type: "node_healed".to_string(),
3055                                    node_id: Some(request.node_id.clone()),
3056                                    step_id: Some(request.node_id.clone()),
3057                                    node_kind: Some("llm_call".to_string()),
3058                                    streamable: Some(true),
3059                                    message: Some(format!(
3060                                        "healed streamed structured response confidence={confidence}"
3061                                    )),
3062                                    delta: None,
3063                                    token_kind: None,
3064                                    is_terminal_node_token: None,
3065                                    elapsed_ms: None,
3066                                    metadata: None,
3067                                });
3068                            }
3069                        }
3070                        resolved.payload
3071                    } else {
3072                        Value::String(aggregated)
3073                    };
3074
3075                    Ok(YamlLlmExecutionResult {
3076                        payload,
3077                        usage: final_stream_usage.map(|usage| YamlLlmTokenUsage {
3078                            prompt_tokens: usage.prompt_tokens,
3079                            completion_tokens: usage.completion_tokens,
3080                            total_tokens: usage.total_tokens,
3081                            reasoning_tokens: usage.reasoning_tokens,
3082                        }),
3083                        ttft_ms,
3084                        tool_calls: Vec::new(),
3085                    })
3086                }
3087                CompletionOutcome::Response(response) => {
3088                    let payload = if expects_object {
3089                        let content = response
3090                            .content()
3091                            .ok_or_else(|| "completion returned empty content".to_string())?;
3092                        serde_json::from_str(content).map_err(|error| {
3093                            format!("failed to parse structured completion JSON: {error}")
3094                        })?
3095                    } else {
3096                        Value::String(response.content().unwrap_or_default().to_string())
3097                    };
3098
3099                    Ok(YamlLlmExecutionResult {
3100                        payload,
3101                        usage: Some(YamlLlmTokenUsage {
3102                            prompt_tokens: response.usage.prompt_tokens,
3103                            completion_tokens: response.usage.completion_tokens,
3104                            total_tokens: response.usage.total_tokens,
3105                            reasoning_tokens: response.usage.reasoning_tokens,
3106                        }),
3107                        ttft_ms: None,
3108                        tool_calls: Vec::new(),
3109                    })
3110                }
3111                CompletionOutcome::HealedJson(healed) => {
3112                    if !expects_object {
3113                        return Err(
3114                            "healed json outcome is unsupported for non-object schema".to_string()
3115                        );
3116                    }
3117                    if let Some(sink) = event_sink {
3118                        sink.emit(&YamlWorkflowEvent {
3119                            event_type: "node_healed".to_string(),
3120                            node_id: Some(request.node_id.clone()),
3121                            step_id: Some(request.node_id.clone()),
3122                            node_kind: Some("llm_call".to_string()),
3123                            streamable: Some(request.stream),
3124                            message: Some(format!(
3125                                "healed structured response confidence={}",
3126                                healed.parsed.confidence
3127                            )),
3128                            delta: None,
3129                            token_kind: None,
3130                            is_terminal_node_token: None,
3131                            elapsed_ms: None,
3132                            metadata: None,
3133                        });
3134                    }
3135                    Ok(YamlLlmExecutionResult {
3136                        payload: healed.parsed.value,
3137                        usage: Some(YamlLlmTokenUsage {
3138                            prompt_tokens: healed.response.usage.prompt_tokens,
3139                            completion_tokens: healed.response.usage.completion_tokens,
3140                            total_tokens: healed.response.usage.total_tokens,
3141                            reasoning_tokens: healed.response.usage.reasoning_tokens,
3142                        }),
3143                        ttft_ms: None,
3144                        tool_calls: Vec::new(),
3145                    })
3146                }
3147                CompletionOutcome::CoercedSchema(coerced) => {
3148                    if !expects_object {
3149                        return Err(
3150                            "coerced schema outcome is unsupported for non-object schema"
3151                                .to_string(),
3152                        );
3153                    }
3154                    Ok(YamlLlmExecutionResult {
3155                        payload: coerced.coerced.value,
3156                        usage: Some(YamlLlmTokenUsage {
3157                            prompt_tokens: coerced.response.usage.prompt_tokens,
3158                            completion_tokens: coerced.response.usage.completion_tokens,
3159                            total_tokens: coerced.response.usage.total_tokens,
3160                            reasoning_tokens: coerced.response.usage.reasoning_tokens,
3161                        }),
3162                        ttft_ms: None,
3163                        tool_calls: Vec::new(),
3164                    })
3165                }
3166            }
3167        }
3168    }
3169
3170    let executor = BorrowedClientExecutor {
3171        client,
3172        custom_worker,
3173        run_options: options.clone(),
3174    };
3175    run_workflow_yaml_with_custom_worker_and_events_and_options(
3176        workflow,
3177        workflow_input,
3178        &executor,
3179        custom_worker,
3180        event_sink,
3181        options,
3182    )
3183    .await
3184}
3185
3186pub async fn run_email_workflow_yaml_with_client_and_custom_worker_and_events(
3187    workflow: &YamlWorkflow,
3188    email_text: &str,
3189    client: &SimpleAgentsClient,
3190    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3191    event_sink: Option<&dyn YamlWorkflowEventSink>,
3192) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3193    let workflow_input = json!({ "email_text": email_text });
3194    run_workflow_yaml_with_client_and_custom_worker_and_events(
3195        workflow,
3196        &workflow_input,
3197        client,
3198        custom_worker,
3199        event_sink,
3200    )
3201    .await
3202}
3203
3204pub async fn run_workflow_yaml(
3205    workflow: &YamlWorkflow,
3206    workflow_input: &Value,
3207    executor: &dyn YamlWorkflowLlmExecutor,
3208) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3209    run_workflow_yaml_with_custom_worker_and_events(workflow, workflow_input, executor, None, None)
3210        .await
3211}
3212
3213pub async fn run_email_workflow_yaml(
3214    workflow: &YamlWorkflow,
3215    email_text: &str,
3216    executor: &dyn YamlWorkflowLlmExecutor,
3217) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3218    let workflow_input = json!({ "email_text": email_text });
3219    run_workflow_yaml(workflow, &workflow_input, executor).await
3220}
3221
3222pub async fn run_workflow_yaml_with_custom_worker(
3223    workflow: &YamlWorkflow,
3224    workflow_input: &Value,
3225    executor: &dyn YamlWorkflowLlmExecutor,
3226    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3227) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3228    run_workflow_yaml_with_custom_worker_and_events(
3229        workflow,
3230        workflow_input,
3231        executor,
3232        custom_worker,
3233        None,
3234    )
3235    .await
3236}
3237
3238pub async fn run_email_workflow_yaml_with_custom_worker(
3239    workflow: &YamlWorkflow,
3240    email_text: &str,
3241    executor: &dyn YamlWorkflowLlmExecutor,
3242    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3243) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3244    let workflow_input = json!({ "email_text": email_text });
3245    run_workflow_yaml_with_custom_worker(workflow, &workflow_input, executor, custom_worker).await
3246}
3247
3248pub async fn run_workflow_yaml_with_custom_worker_and_events(
3249    workflow: &YamlWorkflow,
3250    workflow_input: &Value,
3251    executor: &dyn YamlWorkflowLlmExecutor,
3252    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3253    event_sink: Option<&dyn YamlWorkflowEventSink>,
3254) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3255    run_workflow_yaml_with_custom_worker_and_events_and_options(
3256        workflow,
3257        workflow_input,
3258        executor,
3259        custom_worker,
3260        event_sink,
3261        &YamlWorkflowRunOptions::default(),
3262    )
3263    .await
3264}
3265
3266pub async fn run_workflow_yaml_with_custom_worker_and_events_and_options(
3267    workflow: &YamlWorkflow,
3268    workflow_input: &Value,
3269    executor: &dyn YamlWorkflowLlmExecutor,
3270    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3271    event_sink: Option<&dyn YamlWorkflowEventSink>,
3272    options: &YamlWorkflowRunOptions,
3273) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3274    if !workflow_input.is_object() {
3275        return Err(YamlWorkflowRunError::InvalidInput {
3276            message: "workflow input must be a JSON object".to_string(),
3277        });
3278    }
3279
3280    validate_sample_rate(options.telemetry.sample_rate)?;
3281
3282    let email_text = workflow_input
3283        .get("email_text")
3284        .and_then(Value::as_str)
3285        .unwrap_or_default();
3286
3287    let diagnostics = verify_yaml_workflow(workflow);
3288    let errors: Vec<YamlWorkflowDiagnostic> = diagnostics
3289        .iter()
3290        .filter(|d| d.severity == YamlWorkflowDiagnosticSeverity::Error)
3291        .cloned()
3292        .collect();
3293    if !errors.is_empty() {
3294        return Err(YamlWorkflowRunError::Validation {
3295            diagnostics_count: errors.len(),
3296            diagnostics: errors,
3297        });
3298    }
3299
3300    if let Some(output) =
3301        try_run_yaml_via_ir_runtime(workflow, workflow_input, executor, custom_worker, options)
3302            .await?
3303    {
3304        return Ok(output);
3305    }
3306
3307    let parent_trace_context = trace_context_from_options(options);
3308    let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
3309
3310    let tracer = workflow_tracer();
3311    let mut workflow_span_context: Option<TraceContext> = None;
3312    let mut workflow_span = if telemetry_context.sampled {
3313        let (span_context, mut span) = tracer.start_span(
3314            "workflow.run",
3315            SpanKind::Workflow,
3316            parent_trace_context.as_ref(),
3317        );
3318        apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
3319        apply_trace_tenant_attributes(span.as_mut(), options);
3320        workflow_span_context = Some(span_context);
3321        Some(span)
3322    } else {
3323        None
3324    };
3325
3326    if workflow.nodes.is_empty() {
3327        return Err(YamlWorkflowRunError::EmptyNodes {
3328            workflow_id: workflow.id.clone(),
3329        });
3330    }
3331
3332    let node_map: HashMap<&str, &YamlNode> = workflow
3333        .nodes
3334        .iter()
3335        .map(|node| (node.id.as_str(), node))
3336        .collect();
3337    if !node_map.contains_key(workflow.entry_node.as_str()) {
3338        return Err(YamlWorkflowRunError::MissingEntry {
3339            entry_node: workflow.entry_node.clone(),
3340        });
3341    }
3342
3343    let edge_map: HashMap<&str, &str> = workflow
3344        .edges
3345        .iter()
3346        .map(|edge| (edge.from.as_str(), edge.to.as_str()))
3347        .collect();
3348
3349    let mut current = workflow.entry_node.clone();
3350    let mut trace = Vec::new();
3351    let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
3352    let mut globals = serde_json::Map::new();
3353    let mut step_timings = Vec::new();
3354    let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
3355    let mut llm_node_models: BTreeMap<String, String> = BTreeMap::new();
3356    let mut token_totals = YamlTokenTotals::default();
3357    let mut workflow_ttft_ms: Option<u128> = None;
3358    let started = Instant::now();
3359
3360    if let Some(sink) = event_sink {
3361        sink.emit(&YamlWorkflowEvent {
3362            event_type: "workflow_started".to_string(),
3363            node_id: None,
3364            step_id: None,
3365            node_kind: None,
3366            streamable: None,
3367            message: Some(format!("workflow_id={}", workflow.id)),
3368            delta: None,
3369            token_kind: None,
3370            is_terminal_node_token: None,
3371            elapsed_ms: Some(0),
3372            metadata: None,
3373        });
3374    }
3375
3376    if event_sink_is_cancelled(event_sink) {
3377        return Err(YamlWorkflowRunError::EventSinkCancelled {
3378            message: workflow_event_sink_cancelled_message().to_string(),
3379        });
3380    }
3381
3382    loop {
3383        if event_sink_is_cancelled(event_sink) {
3384            return Err(YamlWorkflowRunError::EventSinkCancelled {
3385                message: workflow_event_sink_cancelled_message().to_string(),
3386            });
3387        }
3388
3389        let node =
3390            *node_map
3391                .get(current.as_str())
3392                .ok_or_else(|| YamlWorkflowRunError::MissingNode {
3393                    node_id: current.clone(),
3394                })?;
3395
3396        trace.push(node.id.clone());
3397        let step_started = Instant::now();
3398
3399        let mut node_span_context: Option<TraceContext> = None;
3400        let mut node_span = if telemetry_context.sampled {
3401            let (span_context, mut span) = tracer.start_span(
3402                "workflow.node.execute",
3403                SpanKind::Node,
3404                workflow_span_context.as_ref(),
3405            );
3406            node_span_context = Some(span_context);
3407            apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
3408            apply_trace_tenant_attributes(span.as_mut(), options);
3409            span.set_attribute("node_id", node.id.as_str());
3410            span.set_attribute("node_kind", node.kind_name());
3411            if node.kind_name() == "llm_call" {
3412                span.set_attribute("langfuse.observation.type", "generation");
3413            }
3414            Some(span)
3415        } else {
3416            None
3417        };
3418
3419        let node_streamable = node
3420            .node_type
3421            .llm_call
3422            .as_ref()
3423            .map(|llm| llm.stream.unwrap_or(false) && !llm.heal.unwrap_or(false));
3424        let workflow_elapsed_before_node_ms = started.elapsed().as_millis();
3425
3426        if let Some(sink) = event_sink {
3427            sink.emit(&YamlWorkflowEvent {
3428                event_type: "node_started".to_string(),
3429                node_id: Some(node.id.clone()),
3430                step_id: Some(node.id.clone()),
3431                node_kind: Some(node.kind_name().to_string()),
3432                streamable: node_streamable,
3433                message: if node_streamable == Some(false) {
3434                    Some("Node is not streamable; status events only".to_string())
3435                } else {
3436                    None
3437                },
3438                delta: None,
3439                token_kind: None,
3440                is_terminal_node_token: None,
3441                elapsed_ms: Some(workflow_elapsed_before_node_ms),
3442                metadata: None,
3443            });
3444        }
3445
3446        if event_sink_is_cancelled(event_sink) {
3447            return Err(YamlWorkflowRunError::EventSinkCancelled {
3448                message: workflow_event_sink_cancelled_message().to_string(),
3449            });
3450        }
3451
3452        let mut node_usage: Option<YamlLlmTokenUsage> = None;
3453        let mut node_model_name: Option<String> = None;
3454        let is_terminal_node = !edge_map.contains_key(node.id.as_str());
3455        let next = if let Some(llm) = &node.node_type.llm_call {
3456            let prompt_template = node
3457                .config
3458                .as_ref()
3459                .and_then(|cfg| cfg.prompt.as_deref())
3460                .unwrap_or_default();
3461            let context = json!({
3462                "input": workflow_input,
3463                "nodes": outputs,
3464                "globals": Value::Object(globals.clone())
3465            });
3466            let messages = if let Some(path) = llm.messages_path.as_deref() {
3467                Some(
3468                    parse_messages_from_context(path, &context).map_err(|message| {
3469                        YamlWorkflowRunError::Llm {
3470                            node_id: node.id.clone(),
3471                            message,
3472                        }
3473                    })?,
3474                )
3475            } else {
3476                None
3477            };
3478            let prompt_bindings = collect_template_bindings(prompt_template, &context);
3479            let prompt = interpolate_template(prompt_template, &context);
3480            let schema = llm_output_schema_for_node(node);
3481
3482            let request = YamlLlmExecutionRequest {
3483                node_id: node.id.clone(),
3484                is_terminal_node,
3485                stream_json_as_text: llm.stream_json_as_text.unwrap_or(false),
3486                model: resolve_requested_model(options.model.as_deref(), &llm.model),
3487                messages,
3488                append_prompt_as_user: llm.append_prompt_as_user.unwrap_or(true),
3489                prompt,
3490                prompt_template: prompt_template.to_string(),
3491                prompt_bindings,
3492                schema,
3493                stream: llm.stream.unwrap_or(false),
3494                heal: llm.heal.unwrap_or(false),
3495                tools: normalize_llm_tools(llm).map_err(|message| YamlWorkflowRunError::Llm {
3496                    node_id: node.id.clone(),
3497                    message,
3498                })?,
3499                tool_choice: normalize_tool_choice(llm.tool_choice.clone()).map_err(|message| {
3500                    YamlWorkflowRunError::Llm {
3501                        node_id: node.id.clone(),
3502                        message,
3503                    }
3504                })?,
3505                max_tool_roundtrips: llm.max_tool_roundtrips.unwrap_or(1),
3506                tool_calls_global_key: llm.tool_calls_global_key.clone(),
3507                tool_trace_mode: options.telemetry.tool_trace_mode,
3508                execution_context: context.clone(),
3509                email_text: email_text.to_string(),
3510                trace_id: telemetry_context.trace_id.clone(),
3511                trace_context: node_span_context.clone(),
3512                tenant_context: options.trace.tenant.clone(),
3513                trace_sampled: telemetry_context.sampled,
3514            };
3515
3516            if let Some(span) = node_span.as_mut() {
3517                let node_input = payload_for_span(options.telemetry.payload_mode, &context);
3518                span.set_attribute("node_input", node_input.as_str());
3519                span.set_attribute("langfuse.observation.input", node_input.as_str());
3520            }
3521
3522            if let Some(sink) = event_sink {
3523                sink.emit(&YamlWorkflowEvent {
3524                    event_type: "node_llm_input_resolved".to_string(),
3525                    node_id: Some(node.id.clone()),
3526                    step_id: Some(node.id.clone()),
3527                    node_kind: Some("llm_call".to_string()),
3528                    streamable: Some(request.stream),
3529                    message: Some("resolved llm input for telemetry".to_string()),
3530                    delta: None,
3531                    token_kind: None,
3532                    is_terminal_node_token: None,
3533                    elapsed_ms: Some(started.elapsed().as_millis()),
3534                    metadata: Some(json!({
3535                        "model": request.model.clone(),
3536                        "stream_requested": request.stream,
3537                        "stream_json_as_text": request.stream_json_as_text,
3538                        "heal_requested": request.heal,
3539                        "effective_stream": request.stream,
3540                        "prompt_template": request.prompt_template.clone(),
3541                        "prompt": request.prompt.clone(),
3542                        "schema": request.schema.clone(),
3543                        "bindings": request.prompt_bindings.clone(),
3544                        "tools_count": request.tools.len(),
3545                        "max_tool_roundtrips": request.max_tool_roundtrips,
3546                    })),
3547                });
3548            }
3549
3550            node_model_name = Some(request.model.clone());
3551            llm_node_models.insert(node.id.clone(), request.model.clone());
3552
3553            if event_sink_is_cancelled(event_sink) {
3554                return Err(YamlWorkflowRunError::EventSinkCancelled {
3555                    message: workflow_event_sink_cancelled_message().to_string(),
3556                });
3557            }
3558
3559            let llm_result = executor
3560                .complete_structured(request, event_sink)
3561                .await
3562                .map_err(|message| YamlWorkflowRunError::Llm {
3563                    node_id: node.id.clone(),
3564                    message,
3565                })?;
3566
3567            if let Some(usage) = llm_result.usage.as_ref() {
3568                token_totals.add_usage(usage);
3569            }
3570            if workflow_ttft_ms.is_none() {
3571                workflow_ttft_ms = llm_result
3572                    .ttft_ms
3573                    .map(|node_ttft_ms| workflow_elapsed_before_node_ms + node_ttft_ms);
3574            }
3575            node_usage = llm_result.usage;
3576
3577            let payload = llm_result.payload;
3578            let tool_calls = llm_result.tool_calls;
3579
3580            let mut node_output = json!({ "output": payload });
3581            if !tool_calls.is_empty() {
3582                if let Some(output_obj) = node_output.as_object_mut() {
3583                    output_obj.insert("tool_calls".to_string(), json!(tool_calls));
3584                }
3585            }
3586            outputs.insert(node.id.clone(), node_output);
3587            if let Some(span) = node_span.as_mut() {
3588                if let Some(output_payload) = outputs.get(node.id.as_str()) {
3589                    let node_output =
3590                        payload_for_span(options.telemetry.payload_mode, output_payload);
3591                    span.set_attribute("node_output", node_output.as_str());
3592                    span.set_attribute("langfuse.observation.output", node_output.as_str());
3593                }
3594            }
3595            apply_set_globals(node, &outputs, workflow_input, &mut globals);
3596            apply_update_globals(node, &outputs, workflow_input, &mut globals);
3597            if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
3598                if let Some(node_tool_calls) = outputs
3599                    .get(node.id.as_str())
3600                    .and_then(|value| value.get("tool_calls"))
3601                    .cloned()
3602                {
3603                    globals.insert(global_key.clone(), node_tool_calls);
3604                }
3605            }
3606            edge_map
3607                .get(node.id.as_str())
3608                .map(|value| value.to_string())
3609        } else if let Some(switch) = &node.node_type.switch {
3610            let context = json!({
3611                "input": workflow_input,
3612                "nodes": outputs,
3613                "globals": Value::Object(globals.clone())
3614            });
3615            let mut chosen = Some(switch.default.clone());
3616            for branch in &switch.branches {
3617                if evaluate_switch_condition(branch.condition.as_str(), &context)? {
3618                    chosen = Some(branch.target.clone());
3619                    break;
3620                }
3621            }
3622            let chosen = chosen.ok_or_else(|| YamlWorkflowRunError::InvalidSwitchTarget {
3623                node_id: node.id.clone(),
3624            })?;
3625            Some(chosen)
3626        } else if let Some(custom) = &node.node_type.custom_worker {
3627            let payload = node
3628                .config
3629                .as_ref()
3630                .and_then(|cfg| cfg.payload.as_ref())
3631                .cloned()
3632                .unwrap_or_else(|| json!({}));
3633            let context = json!({
3634                "input": workflow_input,
3635                "nodes": outputs,
3636                "globals": Value::Object(globals.clone())
3637            });
3638
3639            if let Some(span) = node_span.as_mut() {
3640                span.set_attribute("handler_name", custom.handler.as_str());
3641                let node_input = payload_for_span(options.telemetry.payload_mode, &payload);
3642                span.set_attribute("node_input", node_input.as_str());
3643                span.set_attribute("langfuse.observation.input", node_input.as_str());
3644            }
3645
3646            let mut handler_span_context: Option<TraceContext> = None;
3647            let mut handler_span = if telemetry_context.sampled {
3648                let (span_context, mut span) = tracer.start_span(
3649                    "handler.invoke",
3650                    SpanKind::Node,
3651                    workflow_span_context.as_ref(),
3652                );
3653                handler_span_context = Some(span_context);
3654                apply_trace_identity_attributes(
3655                    span.as_mut(),
3656                    telemetry_context.trace_id.as_deref(),
3657                );
3658                span.set_attribute("handler_name", custom.handler.as_str());
3659                apply_trace_tenant_attributes(span.as_mut(), options);
3660                Some(span)
3661            } else {
3662                None
3663            };
3664
3665            let worker_trace_context = merged_trace_context_for_worker(
3666                handler_span_context.as_ref(),
3667                telemetry_context.trace_id.as_deref(),
3668                options,
3669            );
3670            let worker_context = custom_worker_context_with_trace(
3671                &context,
3672                &worker_trace_context,
3673                &options.trace.tenant,
3674            );
3675
3676            let worker_output_result = if let Some(custom_worker_executor) = custom_worker {
3677                custom_worker_executor
3678                    .execute(
3679                        custom.handler.as_str(),
3680                        &payload,
3681                        email_text,
3682                        &worker_context,
3683                    )
3684                    .await
3685                    .map_err(|message| YamlWorkflowRunError::CustomWorker {
3686                        node_id: node.id.clone(),
3687                        message,
3688                    })
3689            } else {
3690                mock_custom_worker_output(custom.handler.as_str(), &payload)
3691            };
3692
3693            if let Some(span) = handler_span.take() {
3694                span.end();
3695            }
3696
3697            let worker_output = worker_output_result?;
3698
3699            outputs.insert(node.id.clone(), json!({ "output": worker_output }));
3700            if let Some(span) = node_span.as_mut() {
3701                if let Some(output_payload) = outputs.get(node.id.as_str()) {
3702                    let node_output =
3703                        payload_for_span(options.telemetry.payload_mode, output_payload);
3704                    span.set_attribute("node_output", node_output.as_str());
3705                    span.set_attribute("langfuse.observation.output", node_output.as_str());
3706                }
3707            }
3708            apply_set_globals(node, &outputs, workflow_input, &mut globals);
3709            apply_update_globals(node, &outputs, workflow_input, &mut globals);
3710            edge_map
3711                .get(node.id.as_str())
3712                .map(|value| value.to_string())
3713        } else {
3714            return Err(YamlWorkflowRunError::UnsupportedNodeType {
3715                node_id: node.id.clone(),
3716            });
3717        };
3718
3719        let node_kind = node.kind_name().to_string();
3720        let elapsed_ms = step_started.elapsed().as_millis();
3721        step_timings.push(YamlStepTiming {
3722            node_id: node.id.clone(),
3723            node_kind,
3724            model_name: node_model_name.clone(),
3725            elapsed_ms,
3726            prompt_tokens: node_usage.as_ref().map(|usage| usage.prompt_tokens),
3727            completion_tokens: node_usage.as_ref().map(|usage| usage.completion_tokens),
3728            total_tokens: node_usage.as_ref().map(|usage| usage.total_tokens),
3729            reasoning_tokens: node_usage.as_ref().and_then(|usage| usage.reasoning_tokens),
3730            tokens_per_second: node_usage
3731                .as_ref()
3732                .map(|usage| completion_tokens_per_second(usage.completion_tokens, elapsed_ms)),
3733        });
3734
3735        if let Some(usage) = node_usage.as_ref() {
3736            llm_node_metrics.insert(
3737                node.id.clone(),
3738                YamlLlmNodeMetrics {
3739                    elapsed_ms,
3740                    prompt_tokens: usage.prompt_tokens,
3741                    completion_tokens: usage.completion_tokens,
3742                    total_tokens: usage.total_tokens,
3743                    reasoning_tokens: usage.reasoning_tokens,
3744                    tokens_per_second: completion_tokens_per_second(
3745                        usage.completion_tokens,
3746                        elapsed_ms,
3747                    ),
3748                },
3749            );
3750        }
3751
3752        if let Some(mut span) = node_span.take() {
3753            if let Some(model_name) = node_model_name.as_deref() {
3754                span.set_attribute("langfuse.observation.model.name", model_name);
3755                span.set_attribute("gen_ai.request.model", model_name);
3756            }
3757            if let Some(usage) = node_usage.as_ref() {
3758                apply_langfuse_observation_usage_attributes(span.as_mut(), usage);
3759            }
3760            span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
3761            span.add_event("node_completed");
3762            span.end();
3763        }
3764
3765        if let Some(sink) = event_sink {
3766            sink.emit(&YamlWorkflowEvent {
3767                event_type: "node_completed".to_string(),
3768                node_id: Some(node.id.clone()),
3769                step_id: Some(node.id.clone()),
3770                node_kind: Some(node.kind_name().to_string()),
3771                streamable: node_streamable,
3772                message: None,
3773                delta: None,
3774                token_kind: None,
3775                is_terminal_node_token: None,
3776                elapsed_ms: Some(elapsed_ms),
3777                metadata: None,
3778            });
3779        }
3780
3781        if event_sink_is_cancelled(event_sink) {
3782            return Err(YamlWorkflowRunError::EventSinkCancelled {
3783                message: workflow_event_sink_cancelled_message().to_string(),
3784            });
3785        }
3786
3787        if let Some(next) = next {
3788            current = next;
3789            continue;
3790        }
3791        break;
3792    }
3793
3794    let terminal_node = trace
3795        .last()
3796        .cloned()
3797        .ok_or_else(|| YamlWorkflowRunError::EmptyNodes {
3798            workflow_id: workflow.id.clone(),
3799        })?;
3800
3801    let terminal_output = outputs
3802        .get(terminal_node.as_str())
3803        .and_then(|value| value.get("output"))
3804        .cloned();
3805
3806    let total_elapsed_ms = started.elapsed().as_millis();
3807    let output = YamlWorkflowRunOutput {
3808        workflow_id: workflow.id.clone(),
3809        entry_node: workflow.entry_node.clone(),
3810        email_text: email_text.to_string(),
3811        trace,
3812        outputs,
3813        terminal_node,
3814        terminal_output,
3815        step_timings,
3816        llm_node_metrics,
3817        llm_node_models,
3818        total_elapsed_ms,
3819        ttft_ms: workflow_ttft_ms,
3820        total_input_tokens: token_totals.input_tokens,
3821        total_output_tokens: token_totals.output_tokens,
3822        total_tokens: token_totals.total_tokens,
3823        total_reasoning_tokens: token_totals.reasoning_tokens,
3824        tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
3825        trace_id: telemetry_context.trace_id.clone(),
3826        metadata: telemetry_context.trace_id.as_ref().map(|value| {
3827            workflow_metadata_with_trace(
3828                options,
3829                value,
3830                telemetry_context.sampled,
3831                telemetry_context.trace_id_source,
3832            )
3833        }),
3834    };
3835
3836    if let Some(sink) = event_sink {
3837        let event_metadata = if options.telemetry.nerdstats {
3838            Some(json!({
3839                "nerdstats": workflow_nerdstats(&output),
3840            }))
3841        } else {
3842            None
3843        };
3844        sink.emit(&YamlWorkflowEvent {
3845            event_type: "workflow_completed".to_string(),
3846            node_id: None,
3847            step_id: None,
3848            node_kind: None,
3849            streamable: None,
3850            message: Some(format!("terminal_node={}", output.terminal_node)),
3851            delta: None,
3852            token_kind: None,
3853            is_terminal_node_token: None,
3854            elapsed_ms: Some(output.total_elapsed_ms),
3855            metadata: event_metadata,
3856        });
3857    }
3858
3859    if event_sink_is_cancelled(event_sink) {
3860        return Err(YamlWorkflowRunError::EventSinkCancelled {
3861            message: workflow_event_sink_cancelled_message().to_string(),
3862        });
3863    }
3864
3865    if let Some(mut span) = workflow_span.take() {
3866        span.set_attribute("workflow_id", workflow.id.as_str());
3867        apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
3868        apply_langfuse_trace_input_output_attributes(
3869            span.as_mut(),
3870            workflow_input,
3871            &output,
3872            options.telemetry.payload_mode,
3873        );
3874        apply_langfuse_nerdstats_attributes(span.as_mut(), &output, options.telemetry.nerdstats);
3875        span.end();
3876        flush_workflow_tracer();
3877    }
3878
3879    Ok(output)
3880}
3881
3882async fn try_run_yaml_via_ir_runtime(
3883    workflow: &YamlWorkflow,
3884    workflow_input: &Value,
3885    executor: &dyn YamlWorkflowLlmExecutor,
3886    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3887    options: &YamlWorkflowRunOptions,
3888) -> Result<Option<YamlWorkflowRunOutput>, YamlWorkflowRunError> {
3889    let ir = match yaml_workflow_to_ir(workflow) {
3890        Ok(def) => def,
3891        Err(YamlToIrError::UnsupportedNode { .. })
3892        | Err(YamlToIrError::MultipleOutgoingEdge { .. }) => return Ok(None),
3893        Err(err) => {
3894            return Err(YamlWorkflowRunError::InvalidInput {
3895                message: err.to_string(),
3896            });
3897        }
3898    };
3899
3900    if validate_and_normalize(&ir).is_err() {
3901        return Ok(None);
3902    }
3903
3904    let parent_trace_context = trace_context_from_options(options);
3905    let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
3906
3907    let tracer = workflow_tracer();
3908    let mut workflow_span_context: Option<TraceContext> = None;
3909    let mut workflow_span = if telemetry_context.sampled {
3910        let (span_context, mut span) = tracer.start_span(
3911            "workflow.run",
3912            SpanKind::Workflow,
3913            parent_trace_context.as_ref(),
3914        );
3915        apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
3916        apply_trace_tenant_attributes(span.as_mut(), options);
3917        workflow_span_context = Some(span_context);
3918        Some(span)
3919    } else {
3920        None
3921    };
3922
3923    struct NoopLlm;
3924    #[async_trait]
3925    impl LlmExecutor for NoopLlm {
3926        async fn execute(
3927            &self,
3928            _input: LlmExecutionInput,
3929        ) -> Result<LlmExecutionOutput, LlmExecutionError> {
3930            Err(LlmExecutionError::UnexpectedOutcome(
3931                "yaml_ir_uses_tool_path",
3932            ))
3933        }
3934    }
3935
3936    struct YamlIrToolExecutor<'a> {
3937        llm_executor: &'a dyn YamlWorkflowLlmExecutor,
3938        custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
3939        token_totals: std::sync::Mutex<YamlTokenTotals>,
3940        node_usage: std::sync::Mutex<BTreeMap<String, YamlLlmTokenUsage>>,
3941        node_models: std::sync::Mutex<BTreeMap<String, String>>,
3942        model_override: Option<String>,
3943        trace_id: Option<String>,
3944        trace_context: Option<TraceContext>,
3945        trace_input_context: Option<YamlWorkflowTraceContextInput>,
3946        tenant_context: YamlWorkflowTraceTenantContext,
3947        payload_mode: YamlWorkflowPayloadMode,
3948        trace_sampled: bool,
3949    }
3950
3951    #[async_trait]
3952    impl ToolExecutor for YamlIrToolExecutor<'_> {
3953        async fn execute_tool(
3954            &self,
3955            input: ToolExecutionInput,
3956        ) -> Result<Value, ToolExecutionError> {
3957            let context = build_yaml_context_from_ir_scope(&input.scoped_input);
3958
3959            if input.tool == YAML_LLM_TOOL_ID {
3960                let node_id = input
3961                    .input
3962                    .get("node_id")
3963                    .and_then(Value::as_str)
3964                    .ok_or_else(|| {
3965                        ToolExecutionError::Failed("yaml llm call missing node_id".to_string())
3966                    })?
3967                    .to_string();
3968                let node_id_for_metrics = node_id.clone();
3969                let model = input
3970                    .input
3971                    .get("model")
3972                    .and_then(Value::as_str)
3973                    .ok_or_else(|| {
3974                        ToolExecutionError::Failed("yaml llm call missing model".to_string())
3975                    })?
3976                    .to_string();
3977                let resolved_model =
3978                    resolve_requested_model(self.model_override.as_deref(), &model);
3979                let prompt_template = input
3980                    .input
3981                    .get("prompt_template")
3982                    .and_then(Value::as_str)
3983                    .unwrap_or_default()
3984                    .to_string();
3985                let stream = input
3986                    .input
3987                    .get("stream")
3988                    .and_then(Value::as_bool)
3989                    .unwrap_or(false);
3990                let heal = input
3991                    .input
3992                    .get("heal")
3993                    .and_then(Value::as_bool)
3994                    .unwrap_or(false);
3995                let append_prompt_as_user = input
3996                    .input
3997                    .get("append_prompt_as_user")
3998                    .and_then(Value::as_bool)
3999                    .unwrap_or(true);
4000                let messages_path = input
4001                    .input
4002                    .get("messages_path")
4003                    .and_then(Value::as_str)
4004                    .map(str::to_string);
4005
4006                let messages = if let Some(path) = messages_path.as_deref() {
4007                    Some(
4008                        parse_messages_from_context(path, &context)
4009                            .map_err(ToolExecutionError::Failed)?,
4010                    )
4011                } else {
4012                    None
4013                };
4014
4015                let prompt_bindings = collect_template_bindings(&prompt_template, &context);
4016                let prompt = interpolate_template(&prompt_template, &context);
4017                let email_text = context
4018                    .get("input")
4019                    .and_then(|v| v.get("email_text"))
4020                    .and_then(Value::as_str)
4021                    .unwrap_or_default();
4022                let schema = input
4023                    .input
4024                    .get("output_schema")
4025                    .cloned()
4026                    .unwrap_or_else(default_llm_output_schema);
4027
4028                let request = YamlLlmExecutionRequest {
4029                    node_id,
4030                    is_terminal_node: false,
4031                    stream_json_as_text: input
4032                        .input
4033                        .get("stream_json_as_text")
4034                        .and_then(Value::as_bool)
4035                        .unwrap_or(false),
4036                    model: resolved_model.clone(),
4037                    messages,
4038                    append_prompt_as_user,
4039                    prompt,
4040                    prompt_template,
4041                    prompt_bindings,
4042                    schema,
4043                    stream,
4044                    heal,
4045                    tools: Vec::new(),
4046                    tool_choice: None,
4047                    max_tool_roundtrips: 1,
4048                    tool_calls_global_key: None,
4049                    tool_trace_mode: YamlToolTraceMode::Off,
4050                    execution_context: context.clone(),
4051                    email_text: email_text.to_string(),
4052                    trace_id: self.trace_id.clone(),
4053                    trace_context: self.trace_context.clone(),
4054                    tenant_context: self.tenant_context.clone(),
4055                    trace_sampled: self.trace_sampled,
4056                };
4057
4058                let llm_result = self
4059                    .llm_executor
4060                    .complete_structured(request, None)
4061                    .await
4062                    .map_err(ToolExecutionError::Failed);
4063
4064                if let Ok(ref result) = llm_result {
4065                    if let Some(usage) = result.usage.as_ref() {
4066                        if let Ok(mut totals) = self.token_totals.lock() {
4067                            totals.add_usage(usage);
4068                        }
4069                        if let Ok(mut usage_map) = self.node_usage.lock() {
4070                            usage_map.insert(node_id_for_metrics.clone(), usage.clone());
4071                        }
4072                    }
4073                    if let Ok(mut model_map) = self.node_models.lock() {
4074                        model_map.insert(node_id_for_metrics, resolved_model);
4075                    }
4076                }
4077
4078                return llm_result.map(|result| result.payload);
4079            }
4080
4081            let worker = self
4082                .custom_worker
4083                .ok_or_else(|| ToolExecutionError::NotFound {
4084                    tool: input.tool.clone(),
4085                })?;
4086
4087            let payload = input.input.clone();
4088            let email_text = context
4089                .get("input")
4090                .and_then(|v| v.get("email_text"))
4091                .and_then(Value::as_str)
4092                .unwrap_or_default();
4093
4094            let tracer = workflow_tracer();
4095            let mut handler_span_context: Option<TraceContext> = None;
4096            let mut handler_span = if self.trace_sampled {
4097                let (span_context, mut span) = tracer.start_span(
4098                    "handler.invoke",
4099                    SpanKind::Node,
4100                    self.trace_context.as_ref(),
4101                );
4102                handler_span_context = Some(span_context);
4103                apply_trace_identity_attributes(span.as_mut(), self.trace_id.as_deref());
4104                span.set_attribute("handler_name", input.tool.as_str());
4105                apply_trace_tenant_attributes_from_tenant(span.as_mut(), &self.tenant_context);
4106                span.set_attribute(
4107                    "node_input",
4108                    payload_for_span(self.payload_mode, &payload).as_str(),
4109                );
4110                Some(span)
4111            } else {
4112                None
4113            };
4114
4115            let trace_options = YamlWorkflowRunOptions {
4116                telemetry: YamlWorkflowTelemetryConfig::default(),
4117                trace: YamlWorkflowTraceOptions {
4118                    context: self.trace_input_context.clone(),
4119                    tenant: self.tenant_context.clone(),
4120                },
4121                model: None,
4122            };
4123            let worker_trace_context = merged_trace_context_for_worker(
4124                handler_span_context.as_ref(),
4125                self.trace_id.as_deref(),
4126                &trace_options,
4127            );
4128            let worker_context = custom_worker_context_with_trace(
4129                &context,
4130                &worker_trace_context,
4131                &self.tenant_context,
4132            );
4133
4134            let output_result = worker
4135                .execute(&input.tool, &payload, email_text, &worker_context)
4136                .await
4137                .map_err(ToolExecutionError::Failed);
4138
4139            if let Some(span) = handler_span.as_mut() {
4140                if output_result.is_ok() {
4141                    span.add_event("handler.success");
4142                } else {
4143                    span.add_event("handler.error");
4144                }
4145            }
4146
4147            if let Some(span) = handler_span.take() {
4148                span.end();
4149            }
4150
4151            output_result
4152        }
4153    }
4154
4155    let tool_executor = YamlIrToolExecutor {
4156        llm_executor: executor,
4157        custom_worker,
4158        token_totals: std::sync::Mutex::new(YamlTokenTotals::default()),
4159        node_usage: std::sync::Mutex::new(BTreeMap::new()),
4160        node_models: std::sync::Mutex::new(BTreeMap::new()),
4161        model_override: options.model.clone(),
4162        trace_id: telemetry_context.trace_id.clone(),
4163        trace_context: workflow_span_context.clone(),
4164        trace_input_context: options.trace.context.clone(),
4165        tenant_context: options.trace.tenant.clone(),
4166        payload_mode: options.telemetry.payload_mode,
4167        trace_sampled: telemetry_context.sampled,
4168    };
4169
4170    let runtime_options = WorkflowRuntimeOptions {
4171        validate_before_run: false,
4172        ..WorkflowRuntimeOptions::default()
4173    };
4174    let runtime = WorkflowRuntime::new(ir, &NoopLlm, Some(&tool_executor), runtime_options);
4175
4176    let started = Instant::now();
4177    let result = match runtime.execute(workflow_input.clone(), None).await {
4178        Ok(result) => result,
4179        Err(WorkflowRuntimeError::Validation(_)) => return Ok(None),
4180        Err(error) => {
4181            return Err(YamlWorkflowRunError::IrRuntime {
4182                message: error.to_string(),
4183            });
4184        }
4185    };
4186    let total_elapsed_ms = started.elapsed().as_millis();
4187
4188    let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
4189    for (node_id, output) in result.node_outputs {
4190        if node_id == YAML_START_NODE_ID {
4191            continue;
4192        }
4193        outputs.insert(node_id, json!({"output": output}));
4194    }
4195
4196    let mut trace = Vec::new();
4197    let mut step_timings = Vec::new();
4198    let node_usage_map = tool_executor
4199        .node_usage
4200        .lock()
4201        .map(|usage| usage.clone())
4202        .unwrap_or_default();
4203    let llm_node_models = tool_executor
4204        .node_models
4205        .lock()
4206        .map(|models| models.clone())
4207        .unwrap_or_default();
4208    let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
4209    for execution in result.node_executions {
4210        if execution.node_id == YAML_START_NODE_ID {
4211            continue;
4212        }
4213        let node_id = execution.node_id;
4214        trace.push(node_id.clone());
4215        let usage = node_usage_map.get(&node_id);
4216        if let Some(usage) = usage {
4217            llm_node_metrics.insert(
4218                node_id.clone(),
4219                YamlLlmNodeMetrics {
4220                    elapsed_ms: 0,
4221                    prompt_tokens: usage.prompt_tokens,
4222                    completion_tokens: usage.completion_tokens,
4223                    total_tokens: usage.total_tokens,
4224                    reasoning_tokens: usage.reasoning_tokens,
4225                    tokens_per_second: completion_tokens_per_second(usage.completion_tokens, 0),
4226                },
4227            );
4228        }
4229        step_timings.push(YamlStepTiming {
4230            node_id: node_id.clone(),
4231            node_kind: "ir_runtime".to_string(),
4232            model_name: llm_node_models.get(&node_id).cloned(),
4233            elapsed_ms: 0,
4234            prompt_tokens: usage.map(|value| value.prompt_tokens),
4235            completion_tokens: usage.map(|value| value.completion_tokens),
4236            total_tokens: usage.map(|value| value.total_tokens),
4237            reasoning_tokens: usage.and_then(|value| value.reasoning_tokens),
4238            tokens_per_second: usage
4239                .map(|value| completion_tokens_per_second(value.completion_tokens, 0)),
4240        });
4241    }
4242
4243    let terminal_node = result.terminal_node_id;
4244    let terminal_output = outputs
4245        .get(&terminal_node)
4246        .and_then(|v| v.get("output"))
4247        .cloned();
4248
4249    let email_text = workflow_input
4250        .get("email_text")
4251        .and_then(Value::as_str)
4252        .unwrap_or_default()
4253        .to_string();
4254
4255    let token_totals = tool_executor
4256        .token_totals
4257        .lock()
4258        .map(|totals| totals.clone())
4259        .unwrap_or_default();
4260
4261    let output = YamlWorkflowRunOutput {
4262        workflow_id: workflow.id.clone(),
4263        entry_node: workflow.entry_node.clone(),
4264        email_text,
4265        trace,
4266        outputs,
4267        terminal_node,
4268        terminal_output,
4269        step_timings,
4270        llm_node_metrics,
4271        llm_node_models,
4272        total_elapsed_ms,
4273        ttft_ms: None,
4274        total_input_tokens: token_totals.input_tokens,
4275        total_output_tokens: token_totals.output_tokens,
4276        total_tokens: token_totals.total_tokens,
4277        total_reasoning_tokens: token_totals.reasoning_tokens,
4278        tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
4279        trace_id: telemetry_context.trace_id.clone(),
4280        metadata: telemetry_context.trace_id.as_ref().map(|value| {
4281            workflow_metadata_with_trace(
4282                options,
4283                value,
4284                telemetry_context.sampled,
4285                telemetry_context.trace_id_source,
4286            )
4287        }),
4288    };
4289
4290    if let Some(mut span) = workflow_span.take() {
4291        span.set_attribute("workflow_id", workflow.id.as_str());
4292        apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
4293        apply_langfuse_trace_input_output_attributes(
4294            span.as_mut(),
4295            workflow_input,
4296            &output,
4297            options.telemetry.payload_mode,
4298        );
4299        apply_langfuse_nerdstats_attributes(span.as_mut(), &output, options.telemetry.nerdstats);
4300        span.end();
4301        flush_workflow_tracer();
4302    }
4303
4304    Ok(Some(output))
4305}
4306
4307fn build_yaml_context_from_ir_scope(scoped_input: &Value) -> Value {
4308    let input = scoped_input.get("input").cloned().unwrap_or(Value::Null);
4309
4310    let mut nodes = serde_json::Map::new();
4311    if let Some(node_outputs) = scoped_input.get("node_outputs").and_then(Value::as_object) {
4312        for (node_id, output) in node_outputs {
4313            nodes.insert(node_id.clone(), json!({"output": output.clone()}));
4314        }
4315    }
4316
4317    json!({
4318        "input": input,
4319        "nodes": Value::Object(nodes),
4320        "globals": Value::Object(serde_json::Map::new())
4321    })
4322}
4323
4324pub async fn run_email_workflow_yaml_with_custom_worker_and_events(
4325    workflow: &YamlWorkflow,
4326    email_text: &str,
4327    executor: &dyn YamlWorkflowLlmExecutor,
4328    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
4329    event_sink: Option<&dyn YamlWorkflowEventSink>,
4330) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
4331    let workflow_input = json!({ "email_text": email_text });
4332    run_workflow_yaml_with_custom_worker_and_events(
4333        workflow,
4334        &workflow_input,
4335        executor,
4336        custom_worker,
4337        event_sink,
4338    )
4339    .await
4340}
4341
4342fn evaluate_switch_condition(
4343    condition: &str,
4344    context: &Value,
4345) -> Result<bool, YamlWorkflowRunError> {
4346    let (left, right) =
4347        condition
4348            .split_once("==")
4349            .ok_or_else(|| YamlWorkflowRunError::UnsupportedCondition {
4350                condition: condition.to_string(),
4351            })?;
4352
4353    let left_path = left.trim().trim_start_matches("$.");
4354    let right_literal = right.trim().trim_matches('"').trim_matches('\'');
4355    let left_value = resolve_path(context, left_path);
4356    Ok(left_value
4357        .and_then(Value::as_str)
4358        .map(|value| value == right_literal)
4359        .unwrap_or(false))
4360}
4361
4362fn parse_messages_from_context(path: &str, context: &Value) -> Result<Vec<Message>, String> {
4363    let normalized_path = path.trim().trim_start_matches("$.");
4364    let value = resolve_path(context, normalized_path)
4365        .ok_or_else(|| format!("messages_path not found: {path}"))?;
4366    let list: Vec<WorkflowMessage> = serde_json::from_value(value.clone()).map_err(|err| {
4367        format!("messages_path must resolve to a list of messages: {path}; {err}")
4368    })?;
4369    if list.is_empty() {
4370        return Err(format!(
4371            "messages_path must not resolve to an empty list: {path}"
4372        ));
4373    }
4374
4375    let mut messages = Vec::with_capacity(list.len());
4376    for (index, item) in list.into_iter().enumerate() {
4377        let mut message = match item.role {
4378            Role::System => Message::system(item.content),
4379            Role::User => Message::user(item.content),
4380            Role::Assistant => Message::assistant(item.content),
4381            Role::Tool => {
4382                let tool_call_id = item
4383                    .tool_call_id
4384                    .ok_or_else(|| format!("tool message at index {index} missing tool_call_id"))?;
4385                Message::tool(item.content, tool_call_id)
4386            }
4387        };
4388
4389        if let Some(name) = item.name {
4390            message = message.with_name(name);
4391        }
4392
4393        messages.push(message);
4394    }
4395
4396    Ok(messages)
4397}
4398
4399pub fn verify_yaml_workflow(workflow: &YamlWorkflow) -> Vec<YamlWorkflowDiagnostic> {
4400    let mut diagnostics = Vec::new();
4401    let known_ids: HashMap<&str, &YamlNode> = workflow
4402        .nodes
4403        .iter()
4404        .map(|node| (node.id.as_str(), node))
4405        .collect();
4406
4407    if !known_ids.contains_key(workflow.entry_node.as_str()) {
4408        diagnostics.push(YamlWorkflowDiagnostic {
4409            node_id: None,
4410            code: "missing_entry".to_string(),
4411            severity: YamlWorkflowDiagnosticSeverity::Error,
4412            message: format!("entry node '{}' does not exist", workflow.entry_node),
4413        });
4414    }
4415
4416    for edge in &workflow.edges {
4417        if !known_ids.contains_key(edge.from.as_str()) {
4418            diagnostics.push(YamlWorkflowDiagnostic {
4419                node_id: Some(edge.from.clone()),
4420                code: "unknown_edge_from".to_string(),
4421                severity: YamlWorkflowDiagnosticSeverity::Error,
4422                message: format!("edge.from '{}' does not exist", edge.from),
4423            });
4424        }
4425        if !known_ids.contains_key(edge.to.as_str()) {
4426            diagnostics.push(YamlWorkflowDiagnostic {
4427                node_id: Some(edge.to.clone()),
4428                code: "unknown_edge_to".to_string(),
4429                severity: YamlWorkflowDiagnosticSeverity::Error,
4430                message: format!("edge.to '{}' does not exist", edge.to),
4431            });
4432        }
4433    }
4434
4435    for node in &workflow.nodes {
4436        if let Some(llm) = &node.node_type.llm_call {
4437            if llm.model.trim().is_empty() {
4438                diagnostics.push(YamlWorkflowDiagnostic {
4439                    node_id: Some(node.id.clone()),
4440                    code: "empty_model".to_string(),
4441                    severity: YamlWorkflowDiagnosticSeverity::Error,
4442                    message: "llm_call.model must not be empty".to_string(),
4443                });
4444            }
4445            if llm.stream.unwrap_or(false) && llm.heal.unwrap_or(false) {
4446                diagnostics.push(YamlWorkflowDiagnostic {
4447                    node_id: Some(node.id.clone()),
4448                    code: "stream_heal_conflict".to_string(),
4449                    severity: YamlWorkflowDiagnosticSeverity::Warning,
4450                    message:
4451                        "llm_call.stream=true with heal=true is not streamable; runtime will disable streaming"
4452                            .to_string(),
4453                });
4454            }
4455
4456            if llm.max_tool_roundtrips.unwrap_or(1) == 0 {
4457                diagnostics.push(YamlWorkflowDiagnostic {
4458                    node_id: Some(node.id.clone()),
4459                    code: "invalid_max_tool_roundtrips".to_string(),
4460                    severity: YamlWorkflowDiagnosticSeverity::Error,
4461                    message: "llm_call.max_tool_roundtrips must be >= 1".to_string(),
4462                });
4463            }
4464
4465            if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
4466                if global_key.trim().is_empty() {
4467                    diagnostics.push(YamlWorkflowDiagnostic {
4468                        node_id: Some(node.id.clone()),
4469                        code: "empty_tool_calls_global_key".to_string(),
4470                        severity: YamlWorkflowDiagnosticSeverity::Error,
4471                        message: "llm_call.tool_calls_global_key must not be empty".to_string(),
4472                    });
4473                }
4474            }
4475
4476            match normalize_tool_choice(llm.tool_choice.clone()) {
4477                Ok(choice) => {
4478                    if let Some(ToolChoice::Tool(choice_tool)) = choice.as_ref() {
4479                        if !llm.tools.iter().any(|tool| match (llm.tools_format, tool) {
4480                            (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
4481                                openai.function.name == choice_tool.function.name
4482                            }
4483                            (
4484                                YamlToolFormat::Simplified,
4485                                YamlToolDeclaration::Simplified(simple),
4486                            ) => simple.name == choice_tool.function.name,
4487                            _ => false,
4488                        }) {
4489                            diagnostics.push(YamlWorkflowDiagnostic {
4490                                node_id: Some(node.id.clone()),
4491                                code: "unknown_tool_choice_function".to_string(),
4492                                severity: YamlWorkflowDiagnosticSeverity::Error,
4493                                message: format!(
4494                                    "llm_call.tool_choice references unknown function '{}'",
4495                                    choice_tool.function.name
4496                                ),
4497                            });
4498                        }
4499                    }
4500                }
4501                Err(message) => {
4502                    diagnostics.push(YamlWorkflowDiagnostic {
4503                        node_id: Some(node.id.clone()),
4504                        code: "invalid_tool_choice".to_string(),
4505                        severity: YamlWorkflowDiagnosticSeverity::Error,
4506                        message,
4507                    });
4508                }
4509            }
4510
4511            let normalized_tools = match normalize_llm_tools(llm) {
4512                Ok(tools) => tools,
4513                Err(message) => {
4514                    diagnostics.push(YamlWorkflowDiagnostic {
4515                        node_id: Some(node.id.clone()),
4516                        code: "invalid_tools_format".to_string(),
4517                        severity: YamlWorkflowDiagnosticSeverity::Error,
4518                        message,
4519                    });
4520                    Vec::new()
4521                }
4522            };
4523
4524            let mut seen_tool_names = HashSet::new();
4525            for tool in &normalized_tools {
4526                let name = tool.definition.function.name.trim();
4527                if name.is_empty() {
4528                    diagnostics.push(YamlWorkflowDiagnostic {
4529                        node_id: Some(node.id.clone()),
4530                        code: "empty_tool_name".to_string(),
4531                        severity: YamlWorkflowDiagnosticSeverity::Error,
4532                        message: "tool function name must not be empty".to_string(),
4533                    });
4534                }
4535                if !seen_tool_names.insert(tool.definition.function.name.clone()) {
4536                    diagnostics.push(YamlWorkflowDiagnostic {
4537                        node_id: Some(node.id.clone()),
4538                        code: "duplicate_tool_name".to_string(),
4539                        severity: YamlWorkflowDiagnosticSeverity::Error,
4540                        message: format!(
4541                            "duplicate tool function name '{}' in node",
4542                            tool.definition.function.name
4543                        ),
4544                    });
4545                }
4546
4547                let schema = tool
4548                    .definition
4549                    .function
4550                    .parameters
4551                    .clone()
4552                    .unwrap_or(Value::Null);
4553                if schema.is_null() {
4554                    diagnostics.push(YamlWorkflowDiagnostic {
4555                        node_id: Some(node.id.clone()),
4556                        code: "missing_tool_input_schema".to_string(),
4557                        severity: YamlWorkflowDiagnosticSeverity::Error,
4558                        message: format!(
4559                            "tool '{}' is missing input schema",
4560                            tool.definition.function.name
4561                        ),
4562                    });
4563                } else if let Err(message) = validate_json_schema(&schema) {
4564                    diagnostics.push(YamlWorkflowDiagnostic {
4565                        node_id: Some(node.id.clone()),
4566                        code: "invalid_tool_input_schema".to_string(),
4567                        severity: YamlWorkflowDiagnosticSeverity::Error,
4568                        message: format!(
4569                            "tool '{}' has invalid input schema: {}",
4570                            tool.definition.function.name, message
4571                        ),
4572                    });
4573                }
4574
4575                if let Some(output_schema) = tool.output_schema.as_ref() {
4576                    if let Err(message) = validate_json_schema(output_schema) {
4577                        diagnostics.push(YamlWorkflowDiagnostic {
4578                            node_id: Some(node.id.clone()),
4579                            code: "invalid_tool_output_schema".to_string(),
4580                            severity: YamlWorkflowDiagnosticSeverity::Error,
4581                            message: format!(
4582                                "tool '{}' has invalid output schema: {}",
4583                                tool.definition.function.name, message
4584                            ),
4585                        });
4586                    }
4587                }
4588            }
4589        }
4590
4591        if let Some(switch) = &node.node_type.switch {
4592            for branch in &switch.branches {
4593                if !known_ids.contains_key(branch.target.as_str()) {
4594                    diagnostics.push(YamlWorkflowDiagnostic {
4595                        node_id: Some(node.id.clone()),
4596                        code: "unknown_switch_target".to_string(),
4597                        severity: YamlWorkflowDiagnosticSeverity::Error,
4598                        message: format!("switch branch target '{}' does not exist", branch.target),
4599                    });
4600                }
4601            }
4602            if !known_ids.contains_key(switch.default.as_str()) {
4603                diagnostics.push(YamlWorkflowDiagnostic {
4604                    node_id: Some(node.id.clone()),
4605                    code: "unknown_switch_default".to_string(),
4606                    severity: YamlWorkflowDiagnosticSeverity::Error,
4607                    message: format!("switch default target '{}' does not exist", switch.default),
4608                });
4609            }
4610        }
4611
4612        if let Some(config) = node.config.as_ref() {
4613            if let Some(update_globals) = config.update_globals.as_ref() {
4614                for (key, update) in update_globals {
4615                    let is_valid_op =
4616                        matches!(update.op.as_str(), "set" | "append" | "increment" | "merge");
4617                    if !is_valid_op {
4618                        diagnostics.push(YamlWorkflowDiagnostic {
4619                            node_id: Some(node.id.clone()),
4620                            code: "unknown_update_op".to_string(),
4621                            severity: YamlWorkflowDiagnosticSeverity::Error,
4622                            message: format!(
4623                                "update_globals key '{}' has unknown op '{}'; expected set|append|increment|merge",
4624                                key, update.op
4625                            ),
4626                        });
4627                    }
4628
4629                    if update.op != "increment" && update.from.is_none() {
4630                        diagnostics.push(YamlWorkflowDiagnostic {
4631                            node_id: Some(node.id.clone()),
4632                            code: "missing_update_from".to_string(),
4633                            severity: YamlWorkflowDiagnosticSeverity::Error,
4634                            message: format!(
4635                                "update_globals key '{}' with op '{}' requires 'from'",
4636                                key, update.op
4637                            ),
4638                        });
4639                    }
4640                }
4641            }
4642        }
4643    }
4644
4645    diagnostics
4646}
4647
4648fn resolve_path<'a>(value: &'a Value, path: &str) -> Option<&'a Value> {
4649    path.split('.')
4650        .filter(|segment| !segment.is_empty())
4651        .try_fold(value, |current, segment| {
4652            if let Ok(index) = segment.parse::<usize>() {
4653                return current.get(index);
4654            }
4655            current.get(segment)
4656        })
4657}
4658
4659fn interpolate_template(template: &str, context: &Value) -> String {
4660    let mut out = String::with_capacity(template.len());
4661    let mut rest = template;
4662
4663    loop {
4664        let Some(start) = rest.find("{{") else {
4665            out.push_str(rest);
4666            break;
4667        };
4668
4669        out.push_str(&rest[..start]);
4670        let after_start = &rest[start + 2..];
4671        let Some(end) = after_start.find("}}") else {
4672            out.push_str(&rest[start..]);
4673            break;
4674        };
4675
4676        let expr = after_start[..end].trim();
4677        let source_path = expr.trim_start_matches("$.");
4678        let replacement = resolve_path(context, source_path)
4679            .map(value_to_template_string)
4680            .unwrap_or_default();
4681        out.push_str(replacement.as_str());
4682
4683        rest = &after_start[end + 2..];
4684    }
4685
4686    out
4687}
4688
4689fn collect_template_bindings(template: &str, context: &Value) -> Vec<YamlTemplateBinding> {
4690    let mut bindings = Vec::new();
4691    let mut rest = template;
4692
4693    loop {
4694        let Some(start) = rest.find("{{") else {
4695            break;
4696        };
4697
4698        let after_start = &rest[start + 2..];
4699        let Some(end) = after_start.find("}}") else {
4700            break;
4701        };
4702
4703        let expr = after_start[..end].trim();
4704        let source_path = expr.trim_start_matches("$.").to_string();
4705        let resolved = resolve_path(context, source_path.as_str()).cloned();
4706        let missing = resolved.is_none();
4707        let resolved_value = resolved.unwrap_or(Value::Null);
4708        bindings.push(YamlTemplateBinding {
4709            index: bindings.len(),
4710            expression: expr.to_string(),
4711            source_path,
4712            resolved_type: json_type_name(&resolved_value).to_string(),
4713            missing,
4714            resolved: resolved_value,
4715        });
4716
4717        rest = &after_start[end + 2..];
4718    }
4719
4720    bindings
4721}
4722
4723fn json_type_name(value: &Value) -> &'static str {
4724    match value {
4725        Value::Null => "null",
4726        Value::Bool(_) => "bool",
4727        Value::Number(_) => "number",
4728        Value::String(_) => "string",
4729        Value::Array(_) => "array",
4730        Value::Object(_) => "object",
4731    }
4732}
4733
4734fn value_to_template_string(value: &Value) -> String {
4735    match value {
4736        Value::Null => String::new(),
4737        Value::Bool(v) => v.to_string(),
4738        Value::Number(v) => v.to_string(),
4739        Value::String(v) => v.clone(),
4740        Value::Array(_) | Value::Object(_) => serde_json::to_string(value).unwrap_or_default(),
4741    }
4742}
4743
4744fn apply_set_globals(
4745    node: &YamlNode,
4746    outputs: &BTreeMap<String, Value>,
4747    workflow_input: &Value,
4748    globals: &mut serde_json::Map<String, Value>,
4749) {
4750    let Some(config) = node.config.as_ref() else {
4751        return;
4752    };
4753    let Some(set_globals) = config.set_globals.as_ref() else {
4754        return;
4755    };
4756
4757    let context = json!({
4758        "input": workflow_input,
4759        "nodes": outputs,
4760        "globals": Value::Object(globals.clone())
4761    });
4762
4763    for (key, expr) in set_globals {
4764        let value = resolve_path(&context, expr.as_str())
4765            .cloned()
4766            .unwrap_or(Value::Null);
4767        globals.insert(key.clone(), value);
4768    }
4769}
4770
4771fn apply_update_globals(
4772    node: &YamlNode,
4773    outputs: &BTreeMap<String, Value>,
4774    workflow_input: &Value,
4775    globals: &mut serde_json::Map<String, Value>,
4776) {
4777    let Some(config) = node.config.as_ref() else {
4778        return;
4779    };
4780    let Some(update_globals) = config.update_globals.as_ref() else {
4781        return;
4782    };
4783
4784    let context = json!({
4785        "input": workflow_input,
4786        "nodes": outputs,
4787        "globals": Value::Object(globals.clone())
4788    });
4789
4790    for (key, update) in update_globals {
4791        match update.op.as_str() {
4792            "set" => {
4793                if let Some(path) = update.from.as_ref() {
4794                    let value = resolve_path(&context, path.as_str())
4795                        .cloned()
4796                        .unwrap_or(Value::Null);
4797                    globals.insert(key.clone(), value);
4798                }
4799            }
4800            "append" => {
4801                if let Some(path) = update.from.as_ref() {
4802                    let value = resolve_path(&context, path.as_str())
4803                        .cloned()
4804                        .unwrap_or(Value::Null);
4805                    let entry = globals
4806                        .entry(key.clone())
4807                        .or_insert_with(|| Value::Array(Vec::new()));
4808                    match entry {
4809                        Value::Array(items) => items.push(value),
4810                        other => {
4811                            let existing = other.clone();
4812                            *other = Value::Array(vec![existing, value]);
4813                        }
4814                    }
4815                }
4816            }
4817            "increment" => {
4818                let by = update.by.unwrap_or(1.0);
4819                let current = globals
4820                    .get(key.as_str())
4821                    .and_then(Value::as_f64)
4822                    .unwrap_or(0.0);
4823                if let Some(next) = serde_json::Number::from_f64(current + by) {
4824                    globals.insert(key.clone(), Value::Number(next));
4825                }
4826            }
4827            "merge" => {
4828                if let Some(path) = update.from.as_ref() {
4829                    let source = resolve_path(&context, path.as_str())
4830                        .cloned()
4831                        .unwrap_or(Value::Null);
4832                    if let Value::Object(source_map) = source {
4833                        let target = globals
4834                            .entry(key.clone())
4835                            .or_insert_with(|| Value::Object(serde_json::Map::new()));
4836                        if let Value::Object(target_map) = target {
4837                            target_map.extend(source_map);
4838                        } else {
4839                            *target = Value::Object(source_map);
4840                        }
4841                    }
4842                }
4843            }
4844            _ => {}
4845        }
4846    }
4847}
4848
4849fn llm_output_schema_for_node(node: &YamlNode) -> Value {
4850    if let Some(schema) = node
4851        .config
4852        .as_ref()
4853        .and_then(|cfg| cfg.output_schema.clone())
4854    {
4855        return schema;
4856    }
4857
4858    default_llm_output_schema()
4859}
4860
4861fn normalize_tool_choice(
4862    config: Option<YamlToolChoiceConfig>,
4863) -> Result<Option<ToolChoice>, String> {
4864    let Some(config) = config else {
4865        return Ok(None);
4866    };
4867
4868    let choice = match config {
4869        YamlToolChoiceConfig::Mode(mode) => ToolChoice::Mode(mode),
4870        YamlToolChoiceConfig::Function { function } => ToolChoice::Tool(ToolChoiceTool {
4871            tool_type: ToolType::Function,
4872            function: ToolChoiceFunction { name: function },
4873        }),
4874        YamlToolChoiceConfig::OpenAi(tool) => ToolChoice::Tool(tool),
4875    };
4876
4877    Ok(Some(choice))
4878}
4879
4880fn normalize_llm_tools(llm: &YamlLlmCall) -> Result<Vec<YamlResolvedTool>, String> {
4881    llm.tools
4882        .iter()
4883        .map(|tool| match (llm.tools_format, tool) {
4884            (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
4885                let definition = ToolDefinition {
4886                    tool_type: openai.tool_type.unwrap_or(ToolType::Function),
4887                    function: ToolFunction {
4888                        name: openai.function.name.clone(),
4889                        description: openai.function.description.clone(),
4890                        parameters: openai.function.parameters.clone(),
4891                    },
4892                };
4893                Ok(YamlResolvedTool {
4894                    definition,
4895                    output_schema: openai.function.output_schema.clone(),
4896                })
4897            }
4898            (YamlToolFormat::Simplified, YamlToolDeclaration::Simplified(simple)) => {
4899                let definition = ToolDefinition {
4900                    tool_type: ToolType::Function,
4901                    function: ToolFunction {
4902                        name: simple.name.clone(),
4903                        description: simple.description.clone(),
4904                        parameters: Some(simple.input_schema.clone()),
4905                    },
4906                };
4907                Ok(YamlResolvedTool {
4908                    definition,
4909                    output_schema: simple.output_schema.clone(),
4910                })
4911            }
4912            (YamlToolFormat::Openai, _) => {
4913                Err("tools_format=openai requires OpenAI-style tool declarations".to_string())
4914            }
4915            (YamlToolFormat::Simplified, _) => {
4916                Err("tools_format=simplified requires simplified tool declarations".to_string())
4917            }
4918        })
4919        .collect()
4920}
4921
4922fn default_llm_output_schema() -> Value {
4923    json!({
4924        "type": "object",
4925        "additionalProperties": true
4926    })
4927}
4928
4929fn mock_rag(topic: &str) -> Value {
4930    let (kb_source, playbook) = match topic {
4931        "probation" => (
4932            "hr_policy/probation.md",
4933            "Collect manager review, performance evidence, and probation timeline.",
4934        ),
4935        "leave_request" => (
4936            "hr_policy/leave.md",
4937            "Validate leave balance, manager approval, and blackout dates.",
4938        ),
4939        "supply_chain_order_assessment" => (
4940            "supply_chain/order_assessment.md",
4941            "Review order specs, inventory risk, and vendor lead-time guidance.",
4942        ),
4943        "supply_chain_order_replacement" => (
4944            "supply_chain/order_replacement.md",
4945            "Collect order id, damage proof, and replacement SLA policy.",
4946        ),
4947        "termination_first_time_offense" => (
4948            "hr_policy/termination_first_offense.md",
4949            "Validate first-incident criteria and route to HRBP review.",
4950        ),
4951        "termination_repeated_offense" => (
4952            "hr_policy/termination_repeated_offense.md",
4953            "Collect prior warnings and escalation approvals before final action.",
4954        ),
4955        _ => (
4956            "shared/request_clarification.md",
4957            "Request clarifying details before routing.",
4958        ),
4959    };
4960
4961    json!({
4962        "kb_source": kb_source,
4963        "playbook": playbook,
4964    })
4965}
4966
4967async fn execute_subworkflow_tool_call(
4968    payload: &Value,
4969    context: &Value,
4970    client: &SimpleAgentsClient,
4971    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
4972    parent_options: &YamlWorkflowRunOptions,
4973    parent_trace_context: Option<&TraceContext>,
4974    resolved_trace_id: Option<&str>,
4975) -> Result<Value, String> {
4976    let workflow_id = payload
4977        .get("workflow_id")
4978        .and_then(Value::as_str)
4979        .ok_or_else(|| "run_workflow_graph requires payload.workflow_id".to_string())?;
4980
4981    let input_context = context
4982        .get("input")
4983        .and_then(Value::as_object)
4984        .ok_or_else(|| "run_workflow_graph requires context.input".to_string())?;
4985
4986    let registry = input_context
4987        .get("workflow_registry")
4988        .and_then(Value::as_object)
4989        .ok_or_else(|| {
4990            "run_workflow_graph requires input.workflow_registry map of workflow_id -> yaml_path"
4991                .to_string()
4992        })?;
4993
4994    let workflow_path = registry
4995        .get(workflow_id)
4996        .and_then(Value::as_str)
4997        .ok_or_else(|| {
4998            format!(
4999                "workflow_registry has no entry for workflow_id '{}'",
5000                workflow_id
5001            )
5002        })?;
5003
5004    let parent_depth = input_context
5005        .get("__subgraph_depth")
5006        .and_then(Value::as_u64)
5007        .unwrap_or(0);
5008    let max_depth = input_context
5009        .get("__subgraph_max_depth")
5010        .and_then(Value::as_u64)
5011        .unwrap_or(3);
5012
5013    if parent_depth >= max_depth {
5014        return Err(format!(
5015            "run_workflow_graph depth limit reached (depth={}, max={})",
5016            parent_depth, max_depth
5017        ));
5018    }
5019
5020    let mut subworkflow_input = payload
5021        .get("input")
5022        .and_then(Value::as_object)
5023        .cloned()
5024        .unwrap_or_default();
5025
5026    if !subworkflow_input.contains_key("messages") {
5027        if let Some(messages) = input_context.get("messages") {
5028            subworkflow_input.insert("messages".to_string(), messages.clone());
5029        }
5030    }
5031
5032    if !subworkflow_input.contains_key("email_text") {
5033        if let Some(email_text) = input_context.get("email_text") {
5034            subworkflow_input.insert("email_text".to_string(), email_text.clone());
5035        }
5036    }
5037
5038    if !subworkflow_input.contains_key("workflow_registry") {
5039        subworkflow_input.insert(
5040            "workflow_registry".to_string(),
5041            Value::Object(registry.clone()),
5042        );
5043    }
5044
5045    subworkflow_input.insert(
5046        "__subgraph_depth".to_string(),
5047        Value::Number(serde_json::Number::from(parent_depth + 1)),
5048    );
5049    subworkflow_input.insert(
5050        "__subgraph_max_depth".to_string(),
5051        Value::Number(serde_json::Number::from(max_depth)),
5052    );
5053
5054    let subworkflow_options =
5055        build_subworkflow_options(parent_options, parent_trace_context, resolved_trace_id);
5056
5057    let output = run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
5058        Path::new(workflow_path),
5059        &Value::Object(subworkflow_input),
5060        client,
5061        custom_worker,
5062        None,
5063        &subworkflow_options,
5064    )
5065    .await
5066    .map_err(|error| format!("subworkflow '{}' failed: {}", workflow_id, error))?;
5067
5068    Ok(json!({
5069        "workflow_id": workflow_id,
5070        "workflow_path": workflow_path,
5071        "terminal_node": output.terminal_node,
5072        "terminal_output": output.terminal_output,
5073        "trace": output.trace,
5074    }))
5075}
5076
5077fn build_subworkflow_options(
5078    parent_options: &YamlWorkflowRunOptions,
5079    parent_trace_context: Option<&TraceContext>,
5080    resolved_trace_id: Option<&str>,
5081) -> YamlWorkflowRunOptions {
5082    let mut subworkflow_options = parent_options.clone();
5083    if parent_trace_context.is_some() || resolved_trace_id.is_some() {
5084        let trace_context = YamlWorkflowTraceContextInput {
5085            trace_id: resolved_trace_id
5086                .map(|value| value.to_string())
5087                .or_else(|| parent_trace_context.and_then(|ctx| ctx.trace_id.clone())),
5088            span_id: parent_trace_context.and_then(|ctx| ctx.span_id.clone()),
5089            parent_span_id: parent_trace_context.and_then(|ctx| ctx.parent_span_id.clone()),
5090            traceparent: parent_trace_context.and_then(|ctx| ctx.traceparent.clone()),
5091            tracestate: parent_trace_context.and_then(|ctx| ctx.tracestate.clone()),
5092            baggage: parent_trace_context
5093                .map(|ctx| ctx.baggage.clone())
5094                .unwrap_or_default(),
5095        };
5096        subworkflow_options.trace.context = Some(trace_context);
5097    }
5098    subworkflow_options
5099}
5100
5101fn mock_custom_worker_output(
5102    handler: &str,
5103    payload: &Value,
5104) -> Result<Value, YamlWorkflowRunError> {
5105    if handler == "get_employee_record" {
5106        let employee_name = payload
5107            .get("employee_name")
5108            .and_then(Value::as_str)
5109            .unwrap_or("Unknown Employee")
5110            .trim();
5111        let normalized_name = if employee_name.is_empty() {
5112            "Unknown Employee"
5113        } else {
5114            employee_name
5115        };
5116
5117        let (employee_id, location) = match normalized_name.to_ascii_lowercase().as_str() {
5118            "alex johnson" => ("EMP-2041", "Austin"),
5119            "priya sharma" => ("EMP-3378", "Bengaluru"),
5120            "marcus lee" => ("EMP-1196", "Singapore"),
5121            "sarah chen" => ("EMP-4450", "Toronto"),
5122            _ => ("EMP-0000", "Unassigned"),
5123        };
5124
5125        return Ok(json!({
5126            "employee_name": normalized_name,
5127            "employee_id": employee_id,
5128            "location": location,
5129        }));
5130    }
5131
5132    if let Some(topic) = payload.get("topic").and_then(Value::as_str) {
5133        let mut value = mock_rag(topic);
5134        if let Value::Object(object) = &mut value {
5135            object.insert("handler".to_string(), Value::String(handler.to_string()));
5136        }
5137        return Ok(value);
5138    }
5139
5140    Err(YamlWorkflowRunError::UnsupportedCustomHandler {
5141        handler: handler.to_string(),
5142    })
5143}
5144
5145#[derive(Debug, Clone, Deserialize)]
5146pub struct YamlWorkflow {
5147    pub id: String,
5148    pub entry_node: String,
5149    #[serde(default)]
5150    pub nodes: Vec<YamlNode>,
5151    #[serde(default)]
5152    pub edges: Vec<YamlEdge>,
5153}
5154
5155#[derive(Debug, Clone, Deserialize)]
5156pub struct YamlNode {
5157    pub id: String,
5158    pub node_type: YamlNodeType,
5159    pub config: Option<YamlNodeConfig>,
5160}
5161
5162impl YamlNode {
5163    fn kind_name(&self) -> &'static str {
5164        if self.node_type.llm_call.is_some() {
5165            "llm_call"
5166        } else if self.node_type.switch.is_some() {
5167            "switch"
5168        } else if self.node_type.custom_worker.is_some() {
5169            "custom_worker"
5170        } else {
5171            "unknown"
5172        }
5173    }
5174}
5175
5176#[derive(Debug, Clone, Deserialize)]
5177pub struct YamlNodeType {
5178    pub llm_call: Option<YamlLlmCall>,
5179    pub switch: Option<YamlSwitch>,
5180    pub custom_worker: Option<YamlCustomWorker>,
5181}
5182
5183#[derive(Debug, Clone, Deserialize)]
5184pub struct YamlLlmCall {
5185    pub model: String,
5186    pub stream: Option<bool>,
5187    pub stream_json_as_text: Option<bool>,
5188    pub heal: Option<bool>,
5189    pub messages_path: Option<String>,
5190    pub append_prompt_as_user: Option<bool>,
5191    #[serde(default)]
5192    pub tools_format: YamlToolFormat,
5193    #[serde(default)]
5194    pub tools: Vec<YamlToolDeclaration>,
5195    pub tool_choice: Option<YamlToolChoiceConfig>,
5196    pub max_tool_roundtrips: Option<u8>,
5197    pub tool_calls_global_key: Option<String>,
5198}
5199
5200#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
5201#[serde(rename_all = "snake_case")]
5202pub enum YamlToolFormat {
5203    #[default]
5204    Openai,
5205    Simplified,
5206}
5207
5208#[derive(Debug, Clone, Deserialize)]
5209#[serde(untagged)]
5210pub enum YamlToolDeclaration {
5211    OpenAi(YamlOpenAiToolDeclaration),
5212    Simplified(YamlSimplifiedToolDeclaration),
5213}
5214
5215#[derive(Debug, Clone, Deserialize)]
5216pub struct YamlOpenAiToolDeclaration {
5217    #[serde(rename = "type")]
5218    pub tool_type: Option<ToolType>,
5219    pub function: YamlOpenAiToolFunction,
5220}
5221
5222#[derive(Debug, Clone, Deserialize)]
5223pub struct YamlOpenAiToolFunction {
5224    pub name: String,
5225    pub description: Option<String>,
5226    pub parameters: Option<Value>,
5227    pub output_schema: Option<Value>,
5228}
5229
5230#[derive(Debug, Clone, Deserialize)]
5231pub struct YamlSimplifiedToolDeclaration {
5232    pub name: String,
5233    pub description: Option<String>,
5234    pub input_schema: Value,
5235    pub output_schema: Option<Value>,
5236}
5237
5238#[derive(Debug, Clone, Deserialize)]
5239#[serde(untagged)]
5240pub enum YamlToolChoiceConfig {
5241    Mode(ToolChoiceMode),
5242    Function { function: String },
5243    OpenAi(ToolChoiceTool),
5244}
5245
5246#[derive(Debug, Clone, Deserialize)]
5247pub struct YamlSwitch {
5248    #[serde(default)]
5249    pub branches: Vec<YamlSwitchBranch>,
5250    pub default: String,
5251}
5252
5253#[derive(Debug, Clone, Deserialize)]
5254pub struct YamlSwitchBranch {
5255    pub condition: String,
5256    pub target: String,
5257}
5258
5259#[derive(Debug, Clone, Deserialize)]
5260pub struct YamlCustomWorker {
5261    pub handler: String,
5262}
5263
5264#[derive(Debug, Clone, Deserialize)]
5265pub struct YamlNodeConfig {
5266    pub prompt: Option<String>,
5267    #[serde(default, alias = "schema")]
5268    pub output_schema: Option<Value>,
5269    pub payload: Option<Value>,
5270    pub set_globals: Option<HashMap<String, String>>,
5271    pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
5272}
5273
5274#[derive(Debug, Clone, Deserialize)]
5275pub struct YamlGlobalUpdate {
5276    pub op: String,
5277    pub from: Option<String>,
5278    pub by: Option<f64>,
5279}
5280
5281#[derive(Debug, Clone, Deserialize)]
5282pub struct YamlEdge {
5283    pub from: String,
5284    pub to: String,
5285}
5286
5287#[cfg(test)]
5288mod tests {
5289    use super::*;
5290    use simple_agent_type::provider::{Provider, ProviderRequest, ProviderResponse};
5291    use simple_agent_type::response::{CompletionChoice, CompletionResponse, Usage};
5292    use simple_agent_type::tool::{ToolCallFunction, ToolType};
5293    use simple_agent_type::{Result as SaResult, SimpleAgentsError};
5294    use simple_agents_core::SimpleAgentsClientBuilder;
5295    use std::collections::BTreeMap;
5296    use std::fs;
5297    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5298    use std::sync::{Arc, Mutex, OnceLock};
5299
5300    fn stream_debug_env_lock() -> &'static Mutex<()> {
5301        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
5302        LOCK.get_or_init(|| Mutex::new(()))
5303    }
5304
5305    struct MockExecutor;
5306
5307    struct RecordingSink {
5308        events: Mutex<Vec<YamlWorkflowEvent>>,
5309    }
5310
5311    struct CancelAfterFirstEventSink {
5312        cancelled: AtomicBool,
5313    }
5314
5315    impl YamlWorkflowEventSink for CancelAfterFirstEventSink {
5316        fn emit(&self, _event: &YamlWorkflowEvent) {
5317            self.cancelled.store(true, Ordering::SeqCst);
5318        }
5319
5320        fn is_cancelled(&self) -> bool {
5321            self.cancelled.load(Ordering::SeqCst)
5322        }
5323    }
5324
5325    struct CountingExecutor {
5326        call_count: AtomicUsize,
5327    }
5328
5329    struct CapturingWorker {
5330        context: Mutex<Option<Value>>,
5331    }
5332
5333    struct ToolLoopProvider;
5334
5335    struct UnknownToolProvider;
5336
5337    struct ReasoningUsageProvider;
5338
5339    struct ToolLoopReasoningProvider;
5340
5341    #[derive(Default)]
5342    struct CapturingSpan {
5343        attributes: BTreeMap<String, String>,
5344    }
5345
5346    impl WorkflowSpan for CapturingSpan {
5347        fn set_attribute(&mut self, key: &str, value: &str) {
5348            self.attributes.insert(key.to_string(), value.to_string());
5349        }
5350
5351        fn add_event(&mut self, _name: &str) {}
5352
5353        fn end(self: Box<Self>) {}
5354    }
5355
5356    #[async_trait]
5357    impl Provider for ToolLoopProvider {
5358        fn name(&self) -> &str {
5359            "openai"
5360        }
5361
5362        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5363            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5364            Ok(ProviderRequest::new("mock://tool-loop").with_body(body))
5365        }
5366
5367        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5368            let request: CompletionRequest =
5369                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5370
5371            let has_tools = request
5372                .tools
5373                .as_ref()
5374                .is_some_and(|tools| !tools.is_empty());
5375            let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
5376
5377            let response = if has_tools && !has_tool_result {
5378                CompletionResponse {
5379                    id: "resp_tool_1".to_string(),
5380                    model: request.model.clone(),
5381                    choices: vec![CompletionChoice {
5382                        index: 0,
5383                        message: Message::assistant("").with_tool_calls(vec![ToolCall {
5384                            id: "call_get_context".to_string(),
5385                            tool_type: ToolType::Function,
5386                            function: ToolCallFunction {
5387                                name: "get_customer_context".to_string(),
5388                                arguments: "{\"order_id\":\"123\"}".to_string(),
5389                            },
5390                        }]),
5391                        finish_reason: FinishReason::ToolCalls,
5392                        logprobs: None,
5393                    }],
5394                    usage: Usage::new(10, 5),
5395                    created: None,
5396                    provider: Some(self.name().to_string()),
5397                    healing_metadata: None,
5398                }
5399            } else if has_tools && has_tool_result {
5400                CompletionResponse {
5401                    id: "resp_tool_2".to_string(),
5402                    model: request.model.clone(),
5403                    choices: vec![CompletionChoice {
5404                        index: 0,
5405                        message: Message::assistant("{\"state\":\"done\"}"),
5406                        finish_reason: FinishReason::Stop,
5407                        logprobs: None,
5408                    }],
5409                    usage: Usage::new(12, 6),
5410                    created: None,
5411                    provider: Some(self.name().to_string()),
5412                    healing_metadata: None,
5413                }
5414            } else {
5415                let prompt = request
5416                    .messages
5417                    .iter()
5418                    .rev()
5419                    .find(|m| m.role == Role::User)
5420                    .map(|m| m.content.clone())
5421                    .unwrap_or_default();
5422                let payload = json!({
5423                    "subject": "ok",
5424                    "body": prompt,
5425                })
5426                .to_string();
5427                CompletionResponse {
5428                    id: "resp_final".to_string(),
5429                    model: request.model.clone(),
5430                    choices: vec![CompletionChoice {
5431                        index: 0,
5432                        message: Message::assistant(payload),
5433                        finish_reason: FinishReason::Stop,
5434                        logprobs: None,
5435                    }],
5436                    usage: Usage::new(8, 4),
5437                    created: None,
5438                    provider: Some(self.name().to_string()),
5439                    healing_metadata: None,
5440                }
5441            };
5442
5443            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5444            Ok(ProviderResponse::new(200, body))
5445        }
5446
5447        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5448            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5449        }
5450    }
5451
5452    #[async_trait]
5453    impl Provider for UnknownToolProvider {
5454        fn name(&self) -> &str {
5455            "openai"
5456        }
5457
5458        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5459            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5460            Ok(ProviderRequest::new("mock://unknown-tool").with_body(body))
5461        }
5462
5463        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5464            let request: CompletionRequest =
5465                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5466
5467            let response = CompletionResponse {
5468                id: "resp_unknown_tool".to_string(),
5469                model: request.model,
5470                choices: vec![CompletionChoice {
5471                    index: 0,
5472                    message: Message::assistant("").with_tool_calls(vec![ToolCall {
5473                        id: "call_unknown".to_string(),
5474                        tool_type: ToolType::Function,
5475                        function: ToolCallFunction {
5476                            name: "unknown_tool".to_string(),
5477                            arguments: "{}".to_string(),
5478                        },
5479                    }]),
5480                    finish_reason: FinishReason::ToolCalls,
5481                    logprobs: None,
5482                }],
5483                usage: Usage::new(5, 2),
5484                created: None,
5485                provider: Some(self.name().to_string()),
5486                healing_metadata: None,
5487            };
5488
5489            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5490            Ok(ProviderResponse::new(200, body))
5491        }
5492
5493        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5494            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5495        }
5496    }
5497
5498    #[async_trait]
5499    impl Provider for ReasoningUsageProvider {
5500        fn name(&self) -> &str {
5501            "openai"
5502        }
5503
5504        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5505            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5506            Ok(ProviderRequest::new("mock://reasoning-usage").with_body(body))
5507        }
5508
5509        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5510            let request: CompletionRequest =
5511                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5512
5513            let mut usage = Usage::new(9, 5);
5514            usage.reasoning_tokens = Some(4);
5515            let response = CompletionResponse {
5516                id: "resp_reasoning".to_string(),
5517                model: request.model,
5518                choices: vec![CompletionChoice {
5519                    index: 0,
5520                    message: Message::assistant("{\"state\":\"ok\"}"),
5521                    finish_reason: FinishReason::Stop,
5522                    logprobs: None,
5523                }],
5524                usage,
5525                created: None,
5526                provider: Some(self.name().to_string()),
5527                healing_metadata: None,
5528            };
5529
5530            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5531            Ok(ProviderResponse::new(200, body))
5532        }
5533
5534        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5535            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5536        }
5537    }
5538
5539    #[async_trait]
5540    impl Provider for ToolLoopReasoningProvider {
5541        fn name(&self) -> &str {
5542            "openai"
5543        }
5544
5545        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5546            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5547            Ok(ProviderRequest::new("mock://tool-loop-reasoning").with_body(body))
5548        }
5549
5550        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5551            let request: CompletionRequest =
5552                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5553
5554            let has_tools = request
5555                .tools
5556                .as_ref()
5557                .is_some_and(|tools| !tools.is_empty());
5558            let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
5559
5560            let response = if has_tools && !has_tool_result {
5561                let mut usage = Usage::new(10, 5);
5562                usage.reasoning_tokens = Some(2);
5563                CompletionResponse {
5564                    id: "resp_tool_reasoning_1".to_string(),
5565                    model: request.model.clone(),
5566                    choices: vec![CompletionChoice {
5567                        index: 0,
5568                        message: Message::assistant("").with_tool_calls(vec![ToolCall {
5569                            id: "call_get_context".to_string(),
5570                            tool_type: ToolType::Function,
5571                            function: ToolCallFunction {
5572                                name: "get_customer_context".to_string(),
5573                                arguments: "{\"order_id\":\"123\"}".to_string(),
5574                            },
5575                        }]),
5576                        finish_reason: FinishReason::ToolCalls,
5577                        logprobs: None,
5578                    }],
5579                    usage,
5580                    created: None,
5581                    provider: Some(self.name().to_string()),
5582                    healing_metadata: None,
5583                }
5584            } else {
5585                let mut usage = Usage::new(12, 6);
5586                usage.reasoning_tokens = Some(3);
5587                CompletionResponse {
5588                    id: "resp_tool_reasoning_2".to_string(),
5589                    model: request.model,
5590                    choices: vec![CompletionChoice {
5591                        index: 0,
5592                        message: Message::assistant("{\"state\":\"done\"}"),
5593                        finish_reason: FinishReason::Stop,
5594                        logprobs: None,
5595                    }],
5596                    usage,
5597                    created: None,
5598                    provider: Some(self.name().to_string()),
5599                    healing_metadata: None,
5600                }
5601            };
5602
5603            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5604            Ok(ProviderResponse::new(200, body))
5605        }
5606
5607        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5608            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5609        }
5610    }
5611
5612    struct FixedToolWorker {
5613        payload: Value,
5614    }
5615
5616    struct CountingToolWorker {
5617        execute_calls: AtomicUsize,
5618    }
5619
5620    #[async_trait]
5621    impl YamlWorkflowCustomWorkerExecutor for FixedToolWorker {
5622        async fn execute(
5623            &self,
5624            _handler: &str,
5625            _payload: &Value,
5626            _email_text: &str,
5627            _context: &Value,
5628        ) -> Result<Value, String> {
5629            Ok(self.payload.clone())
5630        }
5631    }
5632
5633    #[async_trait]
5634    impl YamlWorkflowCustomWorkerExecutor for CountingToolWorker {
5635        async fn execute(
5636            &self,
5637            _handler: &str,
5638            _payload: &Value,
5639            _email_text: &str,
5640            _context: &Value,
5641        ) -> Result<Value, String> {
5642            self.execute_calls.fetch_add(1, Ordering::SeqCst);
5643            Ok(json!({"ok": true}))
5644        }
5645    }
5646
5647    #[async_trait]
5648    impl YamlWorkflowLlmExecutor for CountingExecutor {
5649        async fn complete_structured(
5650            &self,
5651            _request: YamlLlmExecutionRequest,
5652            _event_sink: Option<&dyn YamlWorkflowEventSink>,
5653        ) -> Result<YamlLlmExecutionResult, String> {
5654            self.call_count.fetch_add(1, Ordering::SeqCst);
5655            Ok(YamlLlmExecutionResult {
5656                payload: json!({"state":"ok"}),
5657                usage: None,
5658                ttft_ms: None,
5659                tool_calls: Vec::new(),
5660            })
5661        }
5662    }
5663
5664    impl YamlWorkflowEventSink for RecordingSink {
5665        fn emit(&self, event: &YamlWorkflowEvent) {
5666            self.events
5667                .lock()
5668                .expect("recording sink lock should not be poisoned")
5669                .push(event.clone());
5670        }
5671    }
5672
5673    #[async_trait]
5674    impl YamlWorkflowCustomWorkerExecutor for CapturingWorker {
5675        async fn execute(
5676            &self,
5677            _handler: &str,
5678            _payload: &Value,
5679            _email_text: &str,
5680            context: &Value,
5681        ) -> Result<Value, String> {
5682            let mut guard = self
5683                .context
5684                .lock()
5685                .map_err(|_| "capturing worker lock should not be poisoned".to_string())?;
5686            *guard = Some(context.clone());
5687            Ok(json!({"ok": true}))
5688        }
5689    }
5690
5691    #[async_trait]
5692    impl YamlWorkflowLlmExecutor for MockExecutor {
5693        async fn complete_structured(
5694            &self,
5695            request: YamlLlmExecutionRequest,
5696            _event_sink: Option<&dyn YamlWorkflowEventSink>,
5697        ) -> Result<YamlLlmExecutionResult, String> {
5698            let prompt = request.prompt;
5699            if prompt.contains("exactly one category") {
5700                return Ok(YamlLlmExecutionResult {
5701                    payload: json!({"category":"termination","reason":"mock"}),
5702                    usage: Some(YamlLlmTokenUsage {
5703                        prompt_tokens: 10,
5704                        completion_tokens: 5,
5705                        total_tokens: 15,
5706                        reasoning_tokens: None,
5707                    }),
5708                    ttft_ms: None,
5709                    tool_calls: Vec::new(),
5710                });
5711            }
5712            if prompt.contains("Determine termination subtype") {
5713                return Ok(YamlLlmExecutionResult {
5714                    payload: json!({"subtype":"repeated_offense","reason":"mock"}),
5715                    usage: Some(YamlLlmTokenUsage {
5716                        prompt_tokens: 12,
5717                        completion_tokens: 6,
5718                        total_tokens: 18,
5719                        reasoning_tokens: None,
5720                    }),
5721                    ttft_ms: None,
5722                    tool_calls: Vec::new(),
5723                });
5724            }
5725            if prompt.contains("Determine supply chain subtype") {
5726                return Ok(YamlLlmExecutionResult {
5727                    payload: json!({"subtype":"order_replacement","reason":"mock"}),
5728                    usage: Some(YamlLlmTokenUsage {
5729                        prompt_tokens: 11,
5730                        completion_tokens: 4,
5731                        total_tokens: 15,
5732                        reasoning_tokens: None,
5733                    }),
5734                    ttft_ms: None,
5735                    tool_calls: Vec::new(),
5736                });
5737            }
5738            Err("unexpected prompt".to_string())
5739        }
5740    }
5741
5742    #[tokio::test]
5743    async fn runs_yaml_workflow_and_returns_step_timings() {
5744        let yaml = r#"
5745id: email-intake-classification
5746entry_node: classify_top_level
5747nodes:
5748  - id: classify_top_level
5749    node_type:
5750      llm_call:
5751        model: gpt-4.1
5752    config:
5753      prompt: |
5754        Classify this email into exactly one category:
5755        {{ input.email_text }}
5756  - id: route_top_level
5757    node_type:
5758      switch:
5759        branches:
5760          - condition: '$.nodes.classify_top_level.output.category == "termination"'
5761            target: classify_termination_subtype
5762        default: rag_clarification
5763  - id: classify_termination_subtype
5764    node_type:
5765      llm_call:
5766        model: gpt-4.1
5767    config:
5768      prompt: |
5769        Determine termination subtype:
5770        {{ input.email_text }}
5771  - id: route_termination_subtype
5772    node_type:
5773      switch:
5774        branches:
5775          - condition: '$.nodes.classify_termination_subtype.output.subtype == "repeated_offense"'
5776            target: rag_termination_repeated_offense
5777        default: rag_clarification
5778  - id: rag_termination_repeated_offense
5779    node_type:
5780      custom_worker:
5781        handler: GetRagData
5782    config:
5783      payload:
5784        topic: termination_repeated_offense
5785  - id: rag_clarification
5786    node_type:
5787      custom_worker:
5788        handler: GetRagData
5789    config:
5790      payload:
5791        topic: clarification
5792edges:
5793  - from: classify_top_level
5794    to: route_top_level
5795  - from: classify_termination_subtype
5796    to: route_termination_subtype
5797"#;
5798
5799        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5800        let output = run_email_workflow_yaml(&workflow, "test", &MockExecutor)
5801            .await
5802            .expect("yaml workflow should execute");
5803
5804        assert_eq!(output.workflow_id, "email-intake-classification");
5805        assert_eq!(output.terminal_node, "rag_termination_repeated_offense");
5806        assert!(!output.step_timings.is_empty());
5807        assert_eq!(output.step_timings.len(), output.trace.len());
5808        assert!(output
5809            .outputs
5810            .contains_key("rag_termination_repeated_offense"));
5811        assert_eq!(output.total_input_tokens, 22);
5812        assert_eq!(output.total_output_tokens, 11);
5813        assert_eq!(output.total_tokens, 33);
5814    }
5815
5816    #[tokio::test]
5817    async fn emits_resolved_llm_input_event_with_bindings() {
5818        let yaml = r#"
5819id: email-intake-classification
5820entry_node: classify_top_level
5821nodes:
5822  - id: classify_top_level
5823    node_type:
5824      llm_call:
5825        model: gpt-4.1
5826    config:
5827      prompt: |
5828        Classify this email into exactly one category:
5829        {{ input.email_text }}
5830"#;
5831
5832        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5833        let sink = RecordingSink {
5834            events: Mutex::new(Vec::new()),
5835        };
5836
5837        let output = run_email_workflow_yaml_with_custom_worker_and_events(
5838            &workflow,
5839            "Need help with termination",
5840            &MockExecutor,
5841            None,
5842            Some(&sink),
5843        )
5844        .await
5845        .expect("yaml workflow should execute");
5846
5847        assert_eq!(output.terminal_node, "classify_top_level");
5848
5849        let events = sink
5850            .events
5851            .lock()
5852            .expect("recording sink lock should not be poisoned");
5853        let llm_event = events
5854            .iter()
5855            .find(|event| event.event_type == "node_llm_input_resolved")
5856            .expect("expected llm input telemetry event");
5857
5858        let metadata = llm_event
5859            .metadata
5860            .as_ref()
5861            .expect("llm input event must include metadata");
5862        assert_eq!(metadata["model"], Value::String("gpt-4.1".to_string()));
5863        assert_eq!(metadata["stream_requested"], Value::Bool(false));
5864        assert_eq!(metadata["heal_requested"], Value::Bool(false));
5865        assert!(metadata["prompt"]
5866            .as_str()
5867            .expect("prompt should be a string")
5868            .contains("Need help with termination"));
5869
5870        let bindings = metadata["bindings"]
5871            .as_array()
5872            .expect("bindings should be an array");
5873        assert_eq!(bindings.len(), 1);
5874        assert_eq!(
5875            bindings[0]["source_path"],
5876            Value::String("input.email_text".to_string())
5877        );
5878        assert_eq!(
5879            bindings[0]["resolved"],
5880            Value::String("Need help with termination".to_string())
5881        );
5882        assert_eq!(bindings[0]["missing"], Value::Bool(false));
5883        assert_eq!(
5884            bindings[0]["resolved_type"],
5885            Value::String("string".to_string())
5886        );
5887    }
5888
5889    #[tokio::test]
5890    async fn workflow_completed_event_includes_nerdstats_by_default() {
5891        let yaml = r#"
5892id: nerdstats-default
5893entry_node: classify
5894nodes:
5895  - id: classify
5896    node_type:
5897      llm_call:
5898        model: gpt-4.1
5899    config:
5900      prompt: |
5901        Classify this email into exactly one category:
5902        {{ input.email_text }}
5903"#;
5904
5905        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5906        let sink = RecordingSink {
5907            events: Mutex::new(Vec::new()),
5908        };
5909
5910        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5911            &workflow,
5912            &json!({"email_text":"hello"}),
5913            &MockExecutor,
5914            None,
5915            Some(&sink),
5916            &YamlWorkflowRunOptions::default(),
5917        )
5918        .await
5919        .expect("workflow should execute");
5920
5921        let events = sink
5922            .events
5923            .lock()
5924            .expect("recording sink lock should not be poisoned");
5925        let completed = events
5926            .iter()
5927            .find(|event| event.event_type == "workflow_completed")
5928            .expect("expected workflow_completed event");
5929        let metadata = completed
5930            .metadata
5931            .as_ref()
5932            .expect("workflow_completed should include metadata by default");
5933        let nerdstats = metadata
5934            .get("nerdstats")
5935            .expect("nerdstats should be present by default");
5936
5937        assert_eq!(nerdstats["workflow_id"], Value::String(output.workflow_id));
5938        assert_eq!(
5939            nerdstats["terminal_node"],
5940            Value::String(output.terminal_node)
5941        );
5942        assert_eq!(
5943            nerdstats["total_tokens"],
5944            Value::Number(output.total_tokens.into())
5945        );
5946        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
5947        assert_eq!(
5948            nerdstats["token_metrics_source"],
5949            Value::String("provider_usage".to_string())
5950        );
5951        assert!(nerdstats.get("step_timings").is_none());
5952        assert!(nerdstats.get("llm_node_metrics").is_none());
5953        assert!(nerdstats.get("step_details").is_some());
5954        assert!(nerdstats.get("llm_node_models").is_none());
5955        assert_eq!(
5956            nerdstats["step_details"][0]["model_name"],
5957            Value::String("gpt-4.1".to_string())
5958        );
5959        assert_eq!(
5960            nerdstats["step_details"][0]["node_id"],
5961            Value::String("classify".to_string())
5962        );
5963        assert_eq!(nerdstats["ttft_ms"], Value::Null);
5964    }
5965
5966    #[tokio::test]
5967    async fn workflow_completed_event_omits_nerdstats_when_disabled() {
5968        let yaml = r#"
5969id: nerdstats-disabled
5970entry_node: classify
5971nodes:
5972  - id: classify
5973    node_type:
5974      llm_call:
5975        model: gpt-4.1
5976    config:
5977      prompt: |
5978        Classify this email into exactly one category:
5979        {{ input.email_text }}
5980"#;
5981
5982        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5983        let sink = RecordingSink {
5984            events: Mutex::new(Vec::new()),
5985        };
5986        let options = YamlWorkflowRunOptions {
5987            telemetry: YamlWorkflowTelemetryConfig {
5988                nerdstats: false,
5989                ..YamlWorkflowTelemetryConfig::default()
5990            },
5991            ..YamlWorkflowRunOptions::default()
5992        };
5993
5994        run_workflow_yaml_with_custom_worker_and_events_and_options(
5995            &workflow,
5996            &json!({"email_text":"hello"}),
5997            &MockExecutor,
5998            None,
5999            Some(&sink),
6000            &options,
6001        )
6002        .await
6003        .expect("workflow should execute");
6004
6005        let events = sink
6006            .events
6007            .lock()
6008            .expect("recording sink lock should not be poisoned");
6009        let completed = events
6010            .iter()
6011            .find(|event| event.event_type == "workflow_completed")
6012            .expect("expected workflow_completed event");
6013        assert!(completed.metadata.is_none());
6014    }
6015
6016    struct StreamAwareMockExecutor;
6017
6018    #[async_trait]
6019    impl YamlWorkflowLlmExecutor for StreamAwareMockExecutor {
6020        async fn complete_structured(
6021            &self,
6022            request: YamlLlmExecutionRequest,
6023            _event_sink: Option<&dyn YamlWorkflowEventSink>,
6024        ) -> Result<YamlLlmExecutionResult, String> {
6025            Ok(YamlLlmExecutionResult {
6026                payload: json!({"state":"ok"}),
6027                usage: Some(YamlLlmTokenUsage {
6028                    prompt_tokens: 20,
6029                    completion_tokens: 10,
6030                    total_tokens: 30,
6031                    reasoning_tokens: None,
6032                }),
6033                ttft_ms: if request.stream { Some(12) } else { None },
6034                tool_calls: Vec::new(),
6035            })
6036        }
6037    }
6038
6039    #[tokio::test]
6040    async fn workflow_completed_event_includes_nerdstats_for_streaming_nodes() {
6041        let yaml = r#"
6042id: nerdstats-streaming
6043entry_node: classify
6044nodes:
6045  - id: classify
6046    node_type:
6047      llm_call:
6048        model: gpt-4.1
6049        stream: true
6050    config:
6051      prompt: |
6052        Return JSON only:
6053        {"state":"ok"}
6054"#;
6055
6056        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6057        let sink = RecordingSink {
6058            events: Mutex::new(Vec::new()),
6059        };
6060
6061        run_workflow_yaml_with_custom_worker_and_events_and_options(
6062            &workflow,
6063            &json!({"email_text":"hello"}),
6064            &StreamAwareMockExecutor,
6065            None,
6066            Some(&sink),
6067            &YamlWorkflowRunOptions::default(),
6068        )
6069        .await
6070        .expect("workflow should execute");
6071
6072        let events = sink
6073            .events
6074            .lock()
6075            .expect("recording sink lock should not be poisoned");
6076        let completed = events
6077            .iter()
6078            .find(|event| event.event_type == "workflow_completed")
6079            .expect("expected workflow_completed event");
6080        let metadata = completed
6081            .metadata
6082            .as_ref()
6083            .expect("workflow_completed should include metadata by default");
6084        let nerdstats = metadata
6085            .get("nerdstats")
6086            .expect("nerdstats should be present by default");
6087
6088        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
6089        assert_eq!(nerdstats["total_tokens"], Value::Number(30u64.into()));
6090        assert_eq!(nerdstats["ttft_ms"], Value::Number(12u64.into()));
6091        assert!(nerdstats.get("step_timings").is_none());
6092        assert!(nerdstats.get("llm_node_metrics").is_none());
6093        assert_eq!(
6094            nerdstats["step_details"][0]["model_name"],
6095            Value::String("gpt-4.1".to_string())
6096        );
6097        assert_eq!(
6098            nerdstats["step_details"][0]["node_id"],
6099            Value::String("classify".to_string())
6100        );
6101    }
6102
6103    #[test]
6104    fn workflow_nerdstats_marks_stream_token_metrics_unavailable() {
6105        let output = YamlWorkflowRunOutput {
6106            workflow_id: "workflow".to_string(),
6107            entry_node: "start".to_string(),
6108            email_text: "hello".to_string(),
6109            trace: vec!["llm_node".to_string()],
6110            outputs: BTreeMap::new(),
6111            terminal_node: "llm_node".to_string(),
6112            terminal_output: None,
6113            step_timings: vec![YamlStepTiming {
6114                node_id: "llm_node".to_string(),
6115                node_kind: "llm_call".to_string(),
6116                model_name: Some("gpt-4.1".to_string()),
6117                elapsed_ms: 100,
6118                prompt_tokens: None,
6119                completion_tokens: None,
6120                total_tokens: None,
6121                reasoning_tokens: None,
6122                tokens_per_second: None,
6123            }],
6124            llm_node_metrics: BTreeMap::new(),
6125            llm_node_models: BTreeMap::new(),
6126            total_elapsed_ms: 100,
6127            ttft_ms: None,
6128            total_input_tokens: 0,
6129            total_output_tokens: 0,
6130            total_tokens: 0,
6131            total_reasoning_tokens: None,
6132            tokens_per_second: 0.0,
6133            trace_id: Some("trace-1".to_string()),
6134            metadata: None,
6135        };
6136
6137        let nerdstats = workflow_nerdstats(&output);
6138        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(false));
6139        assert_eq!(
6140            nerdstats["token_metrics_source"],
6141            Value::String("provider_stream_usage_unavailable".to_string())
6142        );
6143        assert_eq!(nerdstats["total_tokens"], Value::Null);
6144        assert_eq!(nerdstats["ttft_ms"], Value::Null);
6145        assert!(nerdstats.get("step_timings").is_none());
6146        assert!(nerdstats.get("llm_node_metrics").is_none());
6147        assert_eq!(
6148            nerdstats["step_details"][0]["node_id"],
6149            Value::String("llm_node".to_string())
6150        );
6151        assert_eq!(
6152            nerdstats["step_details"][0]["model_name"],
6153            Value::String("gpt-4.1".to_string())
6154        );
6155    }
6156
6157    #[test]
6158    fn workflow_nerdstats_includes_ttft_when_available() {
6159        let output = YamlWorkflowRunOutput {
6160            workflow_id: "workflow".to_string(),
6161            entry_node: "start".to_string(),
6162            email_text: "hello".to_string(),
6163            trace: vec!["llm_node".to_string()],
6164            outputs: BTreeMap::new(),
6165            terminal_node: "llm_node".to_string(),
6166            terminal_output: None,
6167            step_timings: vec![YamlStepTiming {
6168                node_id: "llm_node".to_string(),
6169                node_kind: "llm_call".to_string(),
6170                model_name: Some("gpt-4.1".to_string()),
6171                elapsed_ms: 100,
6172                prompt_tokens: Some(10),
6173                completion_tokens: Some(15),
6174                total_tokens: Some(25),
6175                reasoning_tokens: None,
6176                tokens_per_second: Some(150.0),
6177            }],
6178            llm_node_metrics: BTreeMap::new(),
6179            llm_node_models: BTreeMap::new(),
6180            total_elapsed_ms: 100,
6181            ttft_ms: Some(42),
6182            total_input_tokens: 10,
6183            total_output_tokens: 15,
6184            total_tokens: 25,
6185            total_reasoning_tokens: None,
6186            tokens_per_second: 150.0,
6187            trace_id: Some("trace-2".to_string()),
6188            metadata: None,
6189        };
6190
6191        let nerdstats = workflow_nerdstats(&output);
6192        assert_eq!(nerdstats["ttft_ms"], Value::Number(42u64.into()));
6193        assert!(nerdstats.get("step_timings").is_none());
6194        assert!(nerdstats.get("llm_node_metrics").is_none());
6195        assert_eq!(
6196            nerdstats["step_details"][0]["node_id"],
6197            Value::String("llm_node".to_string())
6198        );
6199        assert_eq!(
6200            nerdstats["step_details"][0]["model_name"],
6201            Value::String("gpt-4.1".to_string())
6202        );
6203    }
6204
6205    #[test]
6206    fn workflow_nerdstats_schema_contract_is_stable() {
6207        let output = YamlWorkflowRunOutput {
6208            workflow_id: "schema-workflow".to_string(),
6209            entry_node: "start".to_string(),
6210            email_text: "hello".to_string(),
6211            trace: vec!["classify".to_string(), "route".to_string()],
6212            outputs: BTreeMap::new(),
6213            terminal_node: "route".to_string(),
6214            terminal_output: None,
6215            step_timings: vec![
6216                YamlStepTiming {
6217                    node_id: "classify".to_string(),
6218                    node_kind: "llm_call".to_string(),
6219                    model_name: Some("gpt-4.1".to_string()),
6220                    elapsed_ms: 100,
6221                    prompt_tokens: Some(11),
6222                    completion_tokens: Some(22),
6223                    total_tokens: Some(33),
6224                    reasoning_tokens: Some(7),
6225                    tokens_per_second: Some(220.0),
6226                },
6227                YamlStepTiming {
6228                    node_id: "route".to_string(),
6229                    node_kind: "switch".to_string(),
6230                    model_name: None,
6231                    elapsed_ms: 0,
6232                    prompt_tokens: None,
6233                    completion_tokens: None,
6234                    total_tokens: None,
6235                    reasoning_tokens: None,
6236                    tokens_per_second: None,
6237                },
6238            ],
6239            llm_node_metrics: BTreeMap::new(),
6240            llm_node_models: BTreeMap::new(),
6241            total_elapsed_ms: 100,
6242            ttft_ms: Some(9),
6243            total_input_tokens: 11,
6244            total_output_tokens: 22,
6245            total_tokens: 33,
6246            total_reasoning_tokens: Some(7),
6247            tokens_per_second: 220.0,
6248            trace_id: Some("trace-schema".to_string()),
6249            metadata: None,
6250        };
6251
6252        let nerdstats = workflow_nerdstats(&output);
6253        let expected = json!({
6254            "workflow_id": "schema-workflow",
6255            "terminal_node": "route",
6256            "total_elapsed_ms": 100,
6257            "ttft_ms": 9,
6258            "step_details": [
6259                {
6260                    "node_id": "classify",
6261                    "node_kind": "llm_call",
6262                    "model_name": "gpt-4.1",
6263                    "elapsed_ms": 100,
6264                    "prompt_tokens": 11,
6265                    "completion_tokens": 22,
6266                    "total_tokens": 33,
6267                    "reasoning_tokens": 7,
6268                    "tokens_per_second": 220.0
6269                },
6270                {
6271                    "node_id": "route",
6272                    "node_kind": "switch",
6273                    "elapsed_ms": 0
6274                }
6275            ],
6276            "total_input_tokens": 11,
6277            "total_output_tokens": 22,
6278            "total_tokens": 33,
6279            "total_reasoning_tokens": 7,
6280            "tokens_per_second": 220.0,
6281            "trace_id": "trace-schema",
6282            "token_metrics_available": true,
6283            "token_metrics_source": "provider_usage",
6284            "llm_nodes_without_usage": []
6285        });
6286
6287        assert_eq!(nerdstats, expected);
6288    }
6289
6290    struct MessageHistoryExecutor;
6291
6292    #[async_trait]
6293    impl YamlWorkflowLlmExecutor for MessageHistoryExecutor {
6294        async fn complete_structured(
6295            &self,
6296            request: YamlLlmExecutionRequest,
6297            _event_sink: Option<&dyn YamlWorkflowEventSink>,
6298        ) -> Result<YamlLlmExecutionResult, String> {
6299            let messages = request
6300                .messages
6301                .ok_or_else(|| "expected messages in request".to_string())?;
6302            if messages.len() != 2 {
6303                return Err(format!("expected 2 messages, got {}", messages.len()));
6304            }
6305            Ok(YamlLlmExecutionResult {
6306                payload: json!({"category":"termination","reason":"history"}),
6307                usage: Some(YamlLlmTokenUsage {
6308                    prompt_tokens: 7,
6309                    completion_tokens: 3,
6310                    total_tokens: 10,
6311                    reasoning_tokens: None,
6312                }),
6313                ttft_ms: None,
6314                tool_calls: Vec::new(),
6315            })
6316        }
6317    }
6318
6319    #[tokio::test]
6320    async fn supports_messages_path_in_workflow_input() {
6321        let yaml = r#"
6322id: email-intake-classification
6323entry_node: classify_top_level
6324nodes:
6325  - id: classify_top_level
6326    node_type:
6327      llm_call:
6328        model: gpt-4.1
6329        messages_path: input.messages
6330        append_prompt_as_user: false
6331"#;
6332
6333        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6334        let input = json!({
6335            "email_text": "ignored",
6336            "messages": [
6337                {"role": "system", "content": "You are a classifier"},
6338                {"role": "user", "content": "Please classify this"}
6339            ]
6340        });
6341
6342        let output = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
6343            .await
6344            .expect("workflow should use chat history from input");
6345
6346        assert_eq!(output.terminal_node, "classify_top_level");
6347        assert_eq!(
6348            output.outputs["classify_top_level"]["output"]["reason"],
6349            Value::String("history".to_string())
6350        );
6351    }
6352
6353    #[tokio::test]
6354    async fn wrapper_entrypoints_produce_equivalent_outputs() {
6355        let yaml = r#"
6356id: wrapper-equivalence
6357entry_node: classify
6358nodes:
6359  - id: classify
6360    node_type:
6361      llm_call:
6362        model: gpt-4.1
6363    config:
6364      prompt: |
6365        Classify this email into exactly one category:
6366        {{ input.email_text }}
6367"#;
6368
6369        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6370        let input = json!({"email_text":"hello"});
6371
6372        let a = run_workflow_yaml(&workflow, &input, &MockExecutor)
6373            .await
6374            .expect("base entrypoint should execute");
6375        let b = run_workflow_yaml_with_custom_worker(&workflow, &input, &MockExecutor, None)
6376            .await
6377            .expect("custom worker wrapper should execute");
6378        let c = run_workflow_yaml_with_custom_worker_and_events_and_options(
6379            &workflow,
6380            &input,
6381            &MockExecutor,
6382            None,
6383            None,
6384            &YamlWorkflowRunOptions::default(),
6385        )
6386        .await
6387        .expect("events/options wrapper should execute");
6388
6389        assert_eq!(a.workflow_id, b.workflow_id);
6390        assert_eq!(a.workflow_id, c.workflow_id);
6391        assert_eq!(a.terminal_node, b.terminal_node);
6392        assert_eq!(a.terminal_node, c.terminal_node);
6393        assert_eq!(a.outputs, b.outputs);
6394        assert_eq!(a.outputs, c.outputs);
6395        assert_eq!(a.total_tokens, b.total_tokens);
6396        assert_eq!(a.total_tokens, c.total_tokens);
6397    }
6398
6399    #[tokio::test]
6400    async fn yaml_llm_tool_calling_captures_traces_and_supports_globals_reference() {
6401        let yaml = r#"
6402id: tool-calling-workflow
6403entry_node: generate_with_tool
6404nodes:
6405  - id: generate_with_tool
6406    node_type:
6407      llm_call:
6408        model: gpt-4.1
6409        tools_format: simplified
6410        max_tool_roundtrips: 1
6411        tool_calls_global_key: audit
6412        tools:
6413          - name: get_customer_context
6414            description: Fetch customer context
6415            input_schema:
6416              type: object
6417              properties:
6418                order_id: { type: string }
6419              required: [order_id]
6420              additionalProperties: false
6421            output_schema:
6422              type: object
6423              properties:
6424                customer_name: { type: string }
6425              required: [customer_name]
6426              additionalProperties: false
6427    config:
6428      output_schema:
6429        type: object
6430        properties:
6431          state: { type: string }
6432        required: [state]
6433  - id: personalize
6434    node_type:
6435      llm_call:
6436        model: gpt-4.1
6437    config:
6438      prompt: |
6439        Write an email greeting for {{ globals.audit.0.output.customer_name }}.
6440      output_schema:
6441        type: object
6442        properties:
6443          subject: { type: string }
6444          body: { type: string }
6445        required: [subject, body]
6446edges:
6447  - from: generate_with_tool
6448    to: personalize
6449"#;
6450
6451        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6452        let client = SimpleAgentsClientBuilder::new()
6453            .with_provider(Arc::new(ToolLoopProvider))
6454            .build()
6455            .expect("client should build");
6456        let worker = FixedToolWorker {
6457            payload: json!({"customer_name": "Ava"}),
6458        };
6459
6460        let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6461            &workflow,
6462            &json!({"email_text":"hello"}),
6463            &client,
6464            Some(&worker),
6465            None,
6466            &YamlWorkflowRunOptions::default(),
6467        )
6468        .await
6469        .expect("workflow should execute");
6470
6471        assert_eq!(output.trace, vec!["generate_with_tool", "personalize"]);
6472        assert_eq!(
6473            output.outputs["generate_with_tool"]["tool_calls"][0]["output"]["customer_name"],
6474            Value::String("Ava".to_string())
6475        );
6476        let body = output.outputs["personalize"]["output"]["body"]
6477            .as_str()
6478            .expect("body should be string");
6479        assert!(body.contains("Ava"));
6480    }
6481
6482    #[tokio::test]
6483    async fn workflow_with_client_preserves_reasoning_tokens_in_output_and_nerdstats() {
6484        let yaml = r#"
6485id: reasoning-usage-workflow
6486entry_node: classify
6487nodes:
6488  - id: classify
6489    node_type:
6490      llm_call:
6491        model: gpt-4.1
6492    config:
6493      prompt: |
6494        Return JSON only:
6495        {"state":"ok"}
6496      output_schema:
6497        type: object
6498        properties:
6499          state: { type: string }
6500        required: [state]
6501"#;
6502
6503        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6504        let client = SimpleAgentsClientBuilder::new()
6505            .with_provider(Arc::new(ReasoningUsageProvider))
6506            .build()
6507            .expect("client should build");
6508
6509        let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6510            &workflow,
6511            &json!({"email_text":"hello"}),
6512            &client,
6513            None,
6514            None,
6515            &YamlWorkflowRunOptions::default(),
6516        )
6517        .await
6518        .expect("workflow should execute");
6519
6520        assert_eq!(output.total_reasoning_tokens, Some(4));
6521        assert_eq!(output.step_timings.len(), 1);
6522        assert_eq!(output.step_timings[0].reasoning_tokens, Some(4));
6523
6524        let nerdstats = workflow_nerdstats(&output);
6525        assert_eq!(
6526            nerdstats["total_reasoning_tokens"],
6527            Value::Number(4u64.into())
6528        );
6529        assert_eq!(
6530            nerdstats["step_details"][0]["reasoning_tokens"],
6531            Value::Number(4u64.into())
6532        );
6533    }
6534
6535    #[tokio::test]
6536    async fn workflow_with_tools_accumulates_reasoning_tokens_across_roundtrips() {
6537        let yaml = r#"
6538id: tool-reasoning-workflow
6539entry_node: generate_with_tool
6540nodes:
6541  - id: generate_with_tool
6542    node_type:
6543      llm_call:
6544        model: gpt-4.1
6545        tools_format: simplified
6546        max_tool_roundtrips: 1
6547        tools:
6548          - name: get_customer_context
6549            input_schema:
6550              type: object
6551              properties:
6552                order_id: { type: string }
6553              required: [order_id]
6554              additionalProperties: false
6555            output_schema:
6556              type: object
6557              properties:
6558                customer_name: { type: string }
6559              required: [customer_name]
6560              additionalProperties: false
6561    config:
6562      output_schema:
6563        type: object
6564        properties:
6565          state: { type: string }
6566        required: [state]
6567"#;
6568
6569        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6570        let client = SimpleAgentsClientBuilder::new()
6571            .with_provider(Arc::new(ToolLoopReasoningProvider))
6572            .build()
6573            .expect("client should build");
6574        let worker = FixedToolWorker {
6575            payload: json!({"customer_name": "Ava"}),
6576        };
6577
6578        let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6579            &workflow,
6580            &json!({"email_text":"hello"}),
6581            &client,
6582            Some(&worker),
6583            None,
6584            &YamlWorkflowRunOptions::default(),
6585        )
6586        .await
6587        .expect("workflow should execute");
6588
6589        assert_eq!(output.total_reasoning_tokens, Some(5));
6590        assert_eq!(output.step_timings.len(), 1);
6591        assert_eq!(output.step_timings[0].reasoning_tokens, Some(5));
6592
6593        let nerdstats = workflow_nerdstats(&output);
6594        assert_eq!(
6595            nerdstats["total_reasoning_tokens"],
6596            Value::Number(5u64.into())
6597        );
6598        assert_eq!(
6599            nerdstats["step_details"][0]["reasoning_tokens"],
6600            Value::Number(5u64.into())
6601        );
6602    }
6603
6604    #[tokio::test]
6605    async fn yaml_llm_tool_output_schema_mismatch_hard_fails_node() {
6606        let yaml = r#"
6607id: tool-calling-schema-fail
6608entry_node: generate_with_tool
6609nodes:
6610  - id: generate_with_tool
6611    node_type:
6612      llm_call:
6613        model: gpt-4.1
6614        tools_format: simplified
6615        max_tool_roundtrips: 1
6616        tools:
6617          - name: get_customer_context
6618            input_schema:
6619              type: object
6620              properties:
6621                order_id: { type: string }
6622              required: [order_id]
6623              additionalProperties: false
6624            output_schema:
6625              type: object
6626              properties:
6627                customer_name: { type: string }
6628              required: [customer_name]
6629              additionalProperties: false
6630    config:
6631      output_schema:
6632        type: object
6633        properties:
6634          state: { type: string }
6635        required: [state]
6636"#;
6637
6638        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6639        let client = SimpleAgentsClientBuilder::new()
6640            .with_provider(Arc::new(ToolLoopProvider))
6641            .build()
6642            .expect("client should build");
6643        let worker = FixedToolWorker {
6644            payload: json!({"unexpected": "shape"}),
6645        };
6646
6647        let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6648            &workflow,
6649            &json!({"email_text":"hello"}),
6650            &client,
6651            Some(&worker),
6652            None,
6653            &YamlWorkflowRunOptions::default(),
6654        )
6655        .await
6656        .expect_err("workflow should hard-fail on schema mismatch");
6657
6658        match error {
6659            YamlWorkflowRunError::Llm { message, .. } => {
6660                assert!(message.contains("output failed schema validation"));
6661            }
6662            other => panic!("expected llm error, got {other:?}"),
6663        }
6664    }
6665
6666    #[tokio::test]
6667    async fn yaml_llm_unknown_tool_is_rejected_before_custom_worker_execution() {
6668        let yaml = r#"
6669id: unknown-tool-rejected
6670entry_node: generate_with_tool
6671nodes:
6672  - id: generate_with_tool
6673    node_type:
6674      llm_call:
6675        model: gpt-4.1
6676        tools_format: simplified
6677        max_tool_roundtrips: 1
6678        tools:
6679          - name: get_customer_context
6680            input_schema:
6681              type: object
6682              properties:
6683                order_id: { type: string }
6684              required: [order_id]
6685    config:
6686      output_schema:
6687        type: object
6688        properties:
6689          state: { type: string }
6690        required: [state]
6691"#;
6692
6693        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6694        let client = SimpleAgentsClientBuilder::new()
6695            .with_provider(Arc::new(UnknownToolProvider))
6696            .build()
6697            .expect("client should build");
6698        let worker = CountingToolWorker {
6699            execute_calls: AtomicUsize::new(0),
6700        };
6701
6702        let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6703            &workflow,
6704            &json!({"email_text":"hello"}),
6705            &client,
6706            Some(&worker),
6707            None,
6708            &YamlWorkflowRunOptions::default(),
6709        )
6710        .await
6711        .expect_err("workflow should reject unknown tool before executing worker");
6712
6713        match error {
6714            YamlWorkflowRunError::Llm { message, .. } => {
6715                assert!(message.contains("model requested unknown tool 'unknown_tool'"));
6716            }
6717            other => panic!("expected llm error, got {other:?}"),
6718        }
6719
6720        assert_eq!(worker.execute_calls.load(Ordering::SeqCst), 0);
6721    }
6722
6723    #[test]
6724    fn validates_tools_format_mismatch() {
6725        let yaml = r#"
6726id: mismatch
6727entry_node: generate
6728nodes:
6729  - id: generate
6730    node_type:
6731      llm_call:
6732        model: gpt-4.1
6733        tools_format: openai
6734        tools:
6735          - name: get_customer_context
6736            input_schema:
6737              type: object
6738              properties:
6739                order_id: { type: string }
6740              required: [order_id]
6741"#;
6742
6743        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6744        let diagnostics = verify_yaml_workflow(&workflow);
6745        assert!(diagnostics
6746            .iter()
6747            .any(|diagnostic| diagnostic.code == "invalid_tools_format"));
6748    }
6749
6750    #[test]
6751    fn mock_custom_worker_supports_get_employee_record() {
6752        let result = mock_custom_worker_output(
6753            "get_employee_record",
6754            &json!({"employee_name": "Alex Johnson"}),
6755        )
6756        .expect("mock tool should resolve employee record");
6757
6758        assert_eq!(result["employee_id"], Value::String("EMP-2041".to_string()));
6759        assert_eq!(result["location"], Value::String("Austin".to_string()));
6760    }
6761
6762    #[tokio::test]
6763    async fn custom_worker_receives_trace_context_block() {
6764        let yaml = r#"
6765id: custom-worker-trace-context
6766entry_node: lookup
6767nodes:
6768  - id: lookup
6769    node_type:
6770      custom_worker:
6771        handler: GetRagData
6772    config:
6773      payload:
6774        topic: demo
6775"#;
6776
6777        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6778        let worker = CapturingWorker {
6779            context: Mutex::new(None),
6780        };
6781        let options = YamlWorkflowRunOptions {
6782            trace: YamlWorkflowTraceOptions {
6783                context: Some(YamlWorkflowTraceContextInput {
6784                    trace_id: Some("trace-fixed-ctx".to_string()),
6785                    traceparent: Some("00-trace-fixed-ctx-span-fixed-01".to_string()),
6786                    ..YamlWorkflowTraceContextInput::default()
6787                }),
6788                tenant: YamlWorkflowTraceTenantContext {
6789                    conversation_id: Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5".to_string()),
6790                    request_id: Some("turn-7".to_string()),
6791                    ..YamlWorkflowTraceTenantContext::default()
6792                },
6793            },
6794            ..YamlWorkflowRunOptions::default()
6795        };
6796
6797        run_workflow_yaml_with_custom_worker_and_events_and_options(
6798            &workflow,
6799            &json!({"email_text":"hello"}),
6800            &MockExecutor,
6801            Some(&worker),
6802            None,
6803            &options,
6804        )
6805        .await
6806        .expect("workflow should execute");
6807
6808        let captured_context = worker
6809            .context
6810            .lock()
6811            .expect("capturing worker lock should not be poisoned")
6812            .clone()
6813            .expect("custom worker should receive context");
6814
6815        assert_eq!(
6816            captured_context
6817                .get("trace")
6818                .and_then(|trace| trace.get("context"))
6819                .and_then(|context| context.get("trace_id"))
6820                .and_then(Value::as_str),
6821            Some("trace-fixed-ctx")
6822        );
6823        assert_eq!(
6824            captured_context
6825                .get("trace")
6826                .and_then(|trace| trace.get("context"))
6827                .and_then(|context| context.get("traceparent"))
6828                .and_then(Value::as_str),
6829            Some("00-trace-fixed-ctx-span-fixed-01")
6830        );
6831        assert_eq!(
6832            captured_context
6833                .get("trace")
6834                .and_then(|trace| trace.get("tenant"))
6835                .and_then(|tenant| tenant.get("conversation_id"))
6836                .and_then(Value::as_str),
6837            Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5")
6838        );
6839    }
6840
6841    #[tokio::test]
6842    async fn event_sink_cancellation_stops_workflow_before_llm_execution() {
6843        let yaml = r#"
6844id: cancellation-test
6845entry_node: classify
6846nodes:
6847  - id: classify
6848    node_type:
6849      llm_call:
6850        model: gpt-4.1
6851    config:
6852      prompt: |
6853        Classify this email into exactly one category:
6854        {{ input.email_text }}
6855"#;
6856
6857        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6858        let executor = CountingExecutor {
6859            call_count: AtomicUsize::new(0),
6860        };
6861        let sink = CancelAfterFirstEventSink {
6862            cancelled: AtomicBool::new(false),
6863        };
6864
6865        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
6866            &workflow,
6867            &json!({"email_text":"hello"}),
6868            &executor,
6869            None,
6870            Some(&sink),
6871            &YamlWorkflowRunOptions::default(),
6872        )
6873        .await
6874        .expect_err("workflow should stop when sink signals cancellation");
6875
6876        assert!(matches!(
6877            err,
6878            YamlWorkflowRunError::EventSinkCancelled { .. }
6879        ));
6880        assert_eq!(executor.call_count.load(Ordering::SeqCst), 0);
6881    }
6882
6883    #[tokio::test]
6884    async fn rejects_invalid_messages_path_shape() {
6885        let yaml = r#"
6886id: email-intake-classification
6887entry_node: classify_top_level
6888nodes:
6889  - id: classify_top_level
6890    node_type:
6891      llm_call:
6892        model: gpt-4.1
6893        messages_path: input.messages
6894"#;
6895
6896        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6897        let input = json!({
6898            "email_text": "ignored",
6899            "messages": "not-a-list"
6900        });
6901
6902        let err = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
6903            .await
6904            .expect_err("workflow should fail for invalid messages shape");
6905
6906        assert!(matches!(err, YamlWorkflowRunError::Llm { .. }));
6907    }
6908
6909    #[test]
6910    fn renders_yaml_workflow_to_mermaid_with_switch_labels() {
6911        let yaml = r#"
6912id: chat-workflow
6913entry_node: decide
6914nodes:
6915  - id: decide
6916    node_type:
6917      switch:
6918        branches:
6919          - condition: '$.input.mode == "draft"'
6920            target: draft
6921        default: ask
6922  - id: draft
6923    node_type:
6924      llm_call:
6925        model: gpt-4.1
6926  - id: ask
6927    node_type:
6928      llm_call:
6929        model: gpt-4.1
6930edges:
6931  - from: draft
6932    to: ask
6933"#;
6934
6935        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6936        let mermaid = yaml_workflow_to_mermaid(&workflow);
6937
6938        assert!(mermaid.contains("flowchart TD"));
6939        assert!(mermaid.contains("decide -- \"route1\" --> draft"));
6940        assert!(mermaid.contains("decide -- \"default\" --> ask"));
6941        assert!(mermaid.contains("draft --> ask"));
6942    }
6943
6944    #[test]
6945    fn renders_yaml_workflow_tools_as_colored_tool_nodes() {
6946        let yaml = r#"
6947id: tool-graph
6948entry_node: chat
6949nodes:
6950  - id: chat
6951    node_type:
6952      llm_call:
6953        model: gemini-3-flash
6954        tools_format: simplified
6955        tools:
6956          - name: run_workflow_graph
6957            input_schema:
6958              type: object
6959              properties:
6960                workflow_id: { type: string }
6961              required: [workflow_id]
6962edges: []
6963"#;
6964
6965        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6966        let mermaid = yaml_workflow_to_mermaid(&workflow);
6967
6968        assert!(mermaid.contains("chat__tool_0"));
6969        assert!(mermaid.contains("chat -.-> chat__tool_0"));
6970        assert!(mermaid.contains("classDef toolNode"));
6971        assert!(mermaid.contains("class chat__tool_0 toolNode;"));
6972    }
6973
6974    #[test]
6975    fn renders_yaml_workflow_file_to_mermaid_with_subgraph_cluster_when_present() {
6976        let base_dir = std::env::temp_dir().join(format!(
6977            "simple_agents_mermaid_subgraph_{}",
6978            std::time::SystemTime::now()
6979                .duration_since(std::time::UNIX_EPOCH)
6980                .expect("unix epoch")
6981                .as_nanos()
6982        ));
6983        fs::create_dir_all(&base_dir).expect("temp dir should be created");
6984
6985        let orchestrator_path = base_dir.join("email-chat-orchestrator-with-subgraph-tool.yaml");
6986        let subgraph_path = base_dir.join("hr-warning-email-subgraph.yaml");
6987
6988        let orchestrator_yaml = r#"
6989id: email-chat-orchestrator-with-subgraph-tool
6990entry_node: respond_casual
6991nodes:
6992  - id: respond_casual
6993    node_type:
6994      llm_call:
6995        model: gemini-3-flash
6996        tools_format: simplified
6997        tools:
6998          - name: run_workflow_graph
6999            input_schema:
7000              type: object
7001              properties:
7002                workflow_id: { type: string }
7003              required: [workflow_id]
7004    config:
7005      prompt: |
7006        Call with:
7007        {
7008          "workflow_id": "hr_warning_email_subgraph"
7009        }
7010edges: []
7011"#;
7012
7013        let subgraph_yaml = r#"
7014id: hr-warning-email-subgraph
7015entry_node: draft_hr_warning_email
7016nodes:
7017  - id: draft_hr_warning_email
7018    node_type:
7019      llm_call:
7020        model: gemini-3-flash
7021edges: []
7022"#;
7023
7024        fs::write(&orchestrator_path, orchestrator_yaml).expect("orchestrator yaml written");
7025        fs::write(&subgraph_path, subgraph_yaml).expect("subgraph yaml written");
7026
7027        let mermaid =
7028            yaml_workflow_file_to_mermaid(&orchestrator_path).expect("mermaid should render");
7029
7030        assert!(mermaid.contains("Main: email-chat-orchestrator-with-subgraph-tool"));
7031        assert!(mermaid.contains("Subgraph: hr_warning_email_subgraph"));
7032        assert!(mermaid.contains("calls hr_warning_email_subgraph"));
7033        assert!(mermaid.contains("subgraph_1__draft_hr_warning_email"));
7034
7035        fs::remove_dir_all(base_dir).expect("temp dir removed");
7036    }
7037
7038    #[test]
7039    fn converts_yaml_workflow_to_ir_definition() {
7040        let yaml = r#"
7041id: chat-workflow
7042entry_node: classify
7043nodes:
7044  - id: classify
7045    node_type:
7046      llm_call:
7047        model: gpt-4.1
7048    config:
7049      prompt: |
7050        classify
7051  - id: route
7052    node_type:
7053      switch:
7054        branches:
7055          - condition: '$.nodes.classify.output.kind == "x"'
7056            target: done
7057        default: done
7058  - id: done
7059    node_type:
7060      custom_worker:
7061        handler: GetRagData
7062    config:
7063      payload:
7064        topic: test
7065edges:
7066  - from: classify
7067    to: route
7068"#;
7069
7070        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7071        let ir = yaml_workflow_to_ir(&workflow).expect("yaml should convert to ir");
7072
7073        assert_eq!(ir.name, "chat-workflow");
7074        assert!(ir.nodes.iter().any(|n| n.id == "__yaml_start"));
7075        assert!(ir.nodes.iter().any(|n| n.id == "classify"));
7076        assert!(ir.nodes.iter().any(|n| n.id == "route"));
7077        assert!(ir.nodes.iter().any(|n| n.id == "done"));
7078    }
7079
7080    #[test]
7081    fn supports_yaml_to_ir_when_messages_path_is_used() {
7082        let yaml = r#"
7083id: chat-workflow
7084entry_node: classify
7085nodes:
7086  - id: classify
7087    node_type:
7088      llm_call:
7089        model: gpt-4.1
7090        messages_path: input.messages
7091"#;
7092
7093        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7094        let ir =
7095            yaml_workflow_to_ir(&workflow).expect("messages_path should convert to tool-based IR");
7096        assert!(ir.nodes.iter().any(|node| matches!(
7097            node.kind,
7098            crate::ir::NodeKind::Tool { ref tool, .. } if tool == "__yaml_llm_call"
7099        )));
7100    }
7101
7102    #[tokio::test]
7103    async fn workflow_output_contains_trace_id_in_both_locations() {
7104        let yaml = r#"
7105id: trace-test
7106entry_node: classify
7107nodes:
7108  - id: classify
7109    node_type:
7110      llm_call:
7111        model: gpt-4.1
7112    config:
7113      prompt: |
7114        Classify this email into exactly one category:
7115        {{ input.email_text }}
7116"#;
7117
7118        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7119        let output = run_workflow_yaml(&workflow, &json!({"email_text":"hello"}), &MockExecutor)
7120            .await
7121            .expect("workflow should execute");
7122
7123        let trace_id = output
7124            .trace_id
7125            .as_deref()
7126            .expect("trace_id should be present");
7127        assert!(!trace_id.is_empty());
7128        assert_eq!(
7129            output.metadata.as_ref().and_then(|value| {
7130                value
7131                    .get("telemetry")
7132                    .and_then(|telemetry| telemetry.get("trace_id"))
7133                    .and_then(Value::as_str)
7134            }),
7135            Some(trace_id)
7136        );
7137    }
7138
7139    #[tokio::test]
7140    async fn workflow_run_options_sample_rate_zero_marks_trace_unsampled() {
7141        let yaml = r#"
7142id: sample-rate-zero
7143entry_node: classify
7144nodes:
7145  - id: classify
7146    node_type:
7147      llm_call:
7148        model: gpt-4.1
7149    config:
7150      prompt: |
7151        Classify this email into exactly one category:
7152        {{ input.email_text }}
7153"#;
7154
7155        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7156        let options = YamlWorkflowRunOptions {
7157            telemetry: YamlWorkflowTelemetryConfig {
7158                sample_rate: 0.0,
7159                ..YamlWorkflowTelemetryConfig::default()
7160            },
7161            trace: YamlWorkflowTraceOptions {
7162                context: Some(YamlWorkflowTraceContextInput {
7163                    trace_id: Some("trace-sample-zero".to_string()),
7164                    ..YamlWorkflowTraceContextInput::default()
7165                }),
7166                tenant: YamlWorkflowTraceTenantContext::default(),
7167            },
7168            model: None,
7169        };
7170
7171        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7172            &workflow,
7173            &json!({"email_text":"hello"}),
7174            &MockExecutor,
7175            None,
7176            None,
7177            &options,
7178        )
7179        .await
7180        .expect("workflow should execute");
7181
7182        assert_eq!(output.trace_id.as_deref(), Some("trace-sample-zero"));
7183        assert_eq!(
7184            output
7185                .metadata
7186                .as_ref()
7187                .and_then(|value| value.get("telemetry"))
7188                .and_then(|telemetry| telemetry.get("sampled"))
7189                .and_then(Value::as_bool),
7190            Some(false)
7191        );
7192    }
7193
7194    #[test]
7195    fn trace_id_from_traceparent_parses_w3c_header() {
7196        let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
7197        assert_eq!(
7198            trace_id_from_traceparent(traceparent).as_deref(),
7199            Some("4bf92f3577b34da6a3ce929d0e0e4736")
7200        );
7201    }
7202
7203    #[test]
7204    fn resolve_telemetry_context_marks_traceparent_source() {
7205        let mut options = YamlWorkflowRunOptions::default();
7206        options.trace.context = Some(YamlWorkflowTraceContextInput {
7207            traceparent: Some(
7208                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
7209            ),
7210            ..YamlWorkflowTraceContextInput::default()
7211        });
7212
7213        let context = resolve_telemetry_context(&options, None);
7214
7215        assert_eq!(context.trace_id_source, TraceIdSource::Traceparent);
7216        assert_eq!(
7217            context.trace_id.as_deref(),
7218            Some("4bf92f3577b34da6a3ce929d0e0e4736")
7219        );
7220    }
7221
7222    #[tokio::test]
7223    async fn workflow_run_options_derives_trace_id_from_traceparent_when_trace_id_missing() {
7224        let yaml = r#"
7225id: traceparent-derived
7226entry_node: classify
7227nodes:
7228  - id: classify
7229    node_type:
7230      llm_call:
7231        model: gpt-4.1
7232    config:
7233      prompt: |
7234        Classify this email into exactly one category:
7235        {{ input.email_text }}
7236"#;
7237
7238        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7239        let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
7240        let options = YamlWorkflowRunOptions {
7241            telemetry: YamlWorkflowTelemetryConfig {
7242                sample_rate: 0.0,
7243                ..YamlWorkflowTelemetryConfig::default()
7244            },
7245            trace: YamlWorkflowTraceOptions {
7246                context: Some(YamlWorkflowTraceContextInput {
7247                    traceparent: Some(traceparent.to_string()),
7248                    ..YamlWorkflowTraceContextInput::default()
7249                }),
7250                tenant: YamlWorkflowTraceTenantContext::default(),
7251            },
7252            model: None,
7253        };
7254
7255        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7256            &workflow,
7257            &json!({"email_text":"hello"}),
7258            &MockExecutor,
7259            None,
7260            None,
7261            &options,
7262        )
7263        .await
7264        .expect("workflow should execute");
7265
7266        assert_eq!(
7267            output.trace_id.as_deref(),
7268            Some("4bf92f3577b34da6a3ce929d0e0e4736")
7269        );
7270        assert_eq!(
7271            output
7272                .metadata
7273                .as_ref()
7274                .and_then(|value| value.get("telemetry"))
7275                .and_then(|telemetry| telemetry.get("sampled"))
7276                .and_then(Value::as_bool),
7277            Some(false)
7278        );
7279    }
7280
7281    #[tokio::test]
7282    async fn workflow_run_options_sample_rate_one_marks_trace_sampled() {
7283        let yaml = r#"
7284id: sample-rate-one
7285entry_node: classify
7286nodes:
7287  - id: classify
7288    node_type:
7289      llm_call:
7290        model: gpt-4.1
7291    config:
7292      prompt: |
7293        Classify this email into exactly one category:
7294        {{ input.email_text }}
7295"#;
7296
7297        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7298        let options = YamlWorkflowRunOptions {
7299            telemetry: YamlWorkflowTelemetryConfig {
7300                sample_rate: 1.0,
7301                ..YamlWorkflowTelemetryConfig::default()
7302            },
7303            trace: YamlWorkflowTraceOptions {
7304                context: Some(YamlWorkflowTraceContextInput {
7305                    trace_id: Some("trace-sample-one".to_string()),
7306                    ..YamlWorkflowTraceContextInput::default()
7307                }),
7308                tenant: YamlWorkflowTraceTenantContext::default(),
7309            },
7310            model: None,
7311        };
7312
7313        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7314            &workflow,
7315            &json!({"email_text":"hello"}),
7316            &MockExecutor,
7317            None,
7318            None,
7319            &options,
7320        )
7321        .await
7322        .expect("workflow should execute");
7323
7324        assert_eq!(output.trace_id.as_deref(), Some("trace-sample-one"));
7325        assert_eq!(
7326            output
7327                .metadata
7328                .as_ref()
7329                .and_then(|value| value.get("telemetry"))
7330                .and_then(|telemetry| telemetry.get("sampled"))
7331                .and_then(Value::as_bool),
7332            Some(true)
7333        );
7334    }
7335
7336    #[tokio::test]
7337    async fn workflow_run_options_sampling_is_deterministic_for_fixed_trace_id() {
7338        let yaml = r#"
7339id: sample-rate-deterministic
7340entry_node: classify
7341nodes:
7342  - id: classify
7343    node_type:
7344      llm_call:
7345        model: gpt-4.1
7346    config:
7347      prompt: |
7348        Classify this email into exactly one category:
7349        {{ input.email_text }}
7350"#;
7351
7352        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7353        let options = YamlWorkflowRunOptions {
7354            telemetry: YamlWorkflowTelemetryConfig {
7355                sample_rate: 0.5,
7356                ..YamlWorkflowTelemetryConfig::default()
7357            },
7358            trace: YamlWorkflowTraceOptions {
7359                context: Some(YamlWorkflowTraceContextInput {
7360                    trace_id: Some("trace-sample-deterministic".to_string()),
7361                    ..YamlWorkflowTraceContextInput::default()
7362                }),
7363                tenant: YamlWorkflowTraceTenantContext::default(),
7364            },
7365            model: None,
7366        };
7367
7368        let mut sampled_values = Vec::new();
7369        for _ in 0..3 {
7370            let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7371                &workflow,
7372                &json!({"email_text":"hello"}),
7373                &MockExecutor,
7374                None,
7375                None,
7376                &options,
7377            )
7378            .await
7379            .expect("workflow should execute");
7380
7381            let sampled = output
7382                .metadata
7383                .as_ref()
7384                .and_then(|value| value.get("telemetry"))
7385                .and_then(|telemetry| telemetry.get("sampled"))
7386                .and_then(Value::as_bool)
7387                .expect("sampled flag should be present");
7388            sampled_values.push(sampled);
7389        }
7390
7391        assert!(sampled_values
7392            .iter()
7393            .all(|value| *value == sampled_values[0]));
7394    }
7395
7396    #[tokio::test]
7397    async fn workflow_run_options_reject_invalid_sample_rate() {
7398        let yaml = r#"
7399id: sample-rate-invalid
7400entry_node: classify
7401nodes:
7402  - id: classify
7403    node_type:
7404      llm_call:
7405        model: gpt-4.1
7406    config:
7407      prompt: |
7408        Classify this email into exactly one category:
7409        {{ input.email_text }}
7410"#;
7411
7412        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7413        let options = YamlWorkflowRunOptions {
7414            telemetry: YamlWorkflowTelemetryConfig {
7415                sample_rate: 1.1,
7416                ..YamlWorkflowTelemetryConfig::default()
7417            },
7418            ..YamlWorkflowRunOptions::default()
7419        };
7420
7421        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
7422            &workflow,
7423            &json!({"email_text":"hello"}),
7424            &MockExecutor,
7425            None,
7426            None,
7427            &options,
7428        )
7429        .await
7430        .expect_err("invalid sample_rate should fail");
7431
7432        match err {
7433            YamlWorkflowRunError::InvalidInput { message } => {
7434                assert!(message.contains("telemetry.sample_rate"));
7435            }
7436            _ => panic!("expected invalid input error for sample_rate"),
7437        }
7438    }
7439
7440    #[tokio::test]
7441    async fn workflow_run_options_reject_nan_sample_rate() {
7442        let yaml = r#"
7443id: sample-rate-invalid
7444entry_node: classify
7445nodes:
7446  - id: classify
7447    node_type:
7448      llm_call:
7449        model: gpt-4.1
7450    config:
7451      prompt: |
7452        Classify this email into exactly one category:
7453        {{ input.email_text }}
7454"#;
7455
7456        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7457        let options = YamlWorkflowRunOptions {
7458            telemetry: YamlWorkflowTelemetryConfig {
7459                sample_rate: f32::NAN,
7460                ..YamlWorkflowTelemetryConfig::default()
7461            },
7462            ..YamlWorkflowRunOptions::default()
7463        };
7464
7465        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
7466            &workflow,
7467            &json!({"email_text":"hello"}),
7468            &MockExecutor,
7469            None,
7470            None,
7471            &options,
7472        )
7473        .await
7474        .expect_err("nan sample_rate should fail");
7475
7476        assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
7477    }
7478
7479    #[test]
7480    fn subworkflow_options_inherit_parent_telemetry_configuration() {
7481        let mut parent_options = YamlWorkflowRunOptions::default();
7482        parent_options.telemetry.sample_rate = 0.0;
7483        parent_options.telemetry.payload_mode = YamlWorkflowPayloadMode::RedactedPayload;
7484        parent_options.telemetry.tool_trace_mode = YamlToolTraceMode::Redacted;
7485        parent_options.trace.tenant.conversation_id = Some("conv-123".to_string());
7486
7487        let parent_context = TraceContext {
7488            trace_id: Some("trace-parent".to_string()),
7489            span_id: Some("span-parent".to_string()),
7490            parent_span_id: None,
7491            traceparent: Some("00-trace-parent-span-parent-01".to_string()),
7492            tracestate: None,
7493            baggage: BTreeMap::new(),
7494        };
7495
7496        let subworkflow_options =
7497            build_subworkflow_options(&parent_options, Some(&parent_context), None);
7498
7499        assert_eq!(subworkflow_options.telemetry.sample_rate, 0.0);
7500        assert_eq!(
7501            subworkflow_options.telemetry.payload_mode,
7502            YamlWorkflowPayloadMode::RedactedPayload
7503        );
7504        assert_eq!(
7505            subworkflow_options.telemetry.tool_trace_mode,
7506            YamlToolTraceMode::Redacted
7507        );
7508        assert_eq!(
7509            subworkflow_options.trace.tenant.conversation_id.as_deref(),
7510            Some("conv-123")
7511        );
7512        assert_eq!(
7513            subworkflow_options
7514                .trace
7515                .context
7516                .as_ref()
7517                .and_then(|ctx| ctx.trace_id.as_deref()),
7518            Some("trace-parent")
7519        );
7520    }
7521
7522    #[test]
7523    fn trace_tenant_attributes_include_langfuse_aliases() {
7524        let tenant = YamlWorkflowTraceTenantContext {
7525            workspace_id: Some("ws-1".to_string()),
7526            user_id: Some("user-1".to_string()),
7527            conversation_id: Some("conv-1".to_string()),
7528            request_id: Some("req-1".to_string()),
7529            run_id: Some("run-1".to_string()),
7530        };
7531        let mut span = CapturingSpan::default();
7532        apply_trace_tenant_attributes_from_tenant(&mut span, &tenant);
7533
7534        assert_eq!(
7535            span.attributes
7536                .get("tenant.workspace_id")
7537                .map(String::as_str),
7538            Some("ws-1")
7539        );
7540        assert_eq!(
7541            span.attributes.get("tenant.user_id").map(String::as_str),
7542            Some("user-1")
7543        );
7544        assert_eq!(
7545            span.attributes
7546                .get("tenant.conversation_id")
7547                .map(String::as_str),
7548            Some("conv-1")
7549        );
7550        assert_eq!(
7551            span.attributes.get("langfuse.user.id").map(String::as_str),
7552            Some("user-1")
7553        );
7554        assert_eq!(
7555            span.attributes
7556                .get("langfuse.session.id")
7557                .map(String::as_str),
7558            Some("conv-1")
7559        );
7560    }
7561
7562    #[test]
7563    fn langfuse_nerdstats_attributes_are_written_when_enabled() {
7564        let output = YamlWorkflowRunOutput {
7565            workflow_id: "wf-1".to_string(),
7566            entry_node: "start".to_string(),
7567            email_text: "email".to_string(),
7568            trace: vec!["node-1".to_string()],
7569            outputs: BTreeMap::new(),
7570            terminal_node: "node-1".to_string(),
7571            terminal_output: None,
7572            step_timings: vec![YamlStepTiming {
7573                node_id: "node-1".to_string(),
7574                node_kind: "llm_call".to_string(),
7575                model_name: Some("gpt-4.1-mini".to_string()),
7576                elapsed_ms: 42,
7577                prompt_tokens: Some(4),
7578                completion_tokens: Some(6),
7579                total_tokens: Some(10),
7580                reasoning_tokens: Some(2),
7581                tokens_per_second: Some(14.0),
7582            }],
7583            llm_node_metrics: BTreeMap::new(),
7584            llm_node_models: BTreeMap::new(),
7585            total_elapsed_ms: 42,
7586            ttft_ms: Some(7),
7587            total_input_tokens: 4,
7588            total_output_tokens: 6,
7589            total_tokens: 10,
7590            total_reasoning_tokens: Some(2),
7591            tokens_per_second: 14.0,
7592            trace_id: Some("trace-1".to_string()),
7593            metadata: None,
7594        };
7595
7596        let mut span = CapturingSpan::default();
7597        apply_langfuse_nerdstats_attributes(&mut span, &output, true);
7598
7599        assert_eq!(
7600            span.attributes
7601                .get("langfuse.trace.metadata.nerdstats.workflow_id")
7602                .map(String::as_str),
7603            Some("wf-1")
7604        );
7605        assert_eq!(
7606            span.attributes
7607                .get("langfuse.trace.metadata.nerdstats.total_tokens")
7608                .map(String::as_str),
7609            Some("10")
7610        );
7611        assert_eq!(
7612            span.attributes
7613                .get("langfuse.trace.metadata.nerdstats.token_metrics_available")
7614                .map(String::as_str),
7615            Some("true")
7616        );
7617        assert!(span
7618            .attributes
7619            .contains_key("langfuse.trace.metadata.nerdstats"));
7620    }
7621
7622    #[test]
7623    fn langfuse_trace_input_output_and_usage_are_written() {
7624        let output = YamlWorkflowRunOutput {
7625            workflow_id: "wf-1".to_string(),
7626            entry_node: "start".to_string(),
7627            email_text: "email".to_string(),
7628            trace: vec!["node-1".to_string()],
7629            outputs: BTreeMap::new(),
7630            terminal_node: "node-1".to_string(),
7631            terminal_output: Some(json!({"final":"ok"})),
7632            step_timings: Vec::new(),
7633            llm_node_metrics: BTreeMap::new(),
7634            llm_node_models: BTreeMap::new(),
7635            total_elapsed_ms: 10,
7636            ttft_ms: None,
7637            total_input_tokens: 11,
7638            total_output_tokens: 22,
7639            total_tokens: 33,
7640            total_reasoning_tokens: Some(4),
7641            tokens_per_second: 1.5,
7642            trace_id: Some("trace-1".to_string()),
7643            metadata: None,
7644        };
7645
7646        let mut span = CapturingSpan::default();
7647        let input = json!({"email_text":"hello"});
7648        apply_langfuse_trace_input_output_attributes(
7649            &mut span,
7650            &input,
7651            &output,
7652            YamlWorkflowPayloadMode::FullPayload,
7653        );
7654
7655        assert_eq!(
7656            span.attributes
7657                .get("langfuse.trace.input")
7658                .map(String::as_str),
7659            Some("{\"email_text\":\"hello\"}")
7660        );
7661        assert_eq!(
7662            span.attributes
7663                .get("langfuse.trace.output")
7664                .map(String::as_str),
7665            Some("{\"final\":\"ok\"}")
7666        );
7667        assert!(span
7668            .attributes
7669            .contains_key("langfuse.trace.metadata.usage_details"));
7670    }
7671
7672    #[test]
7673    fn langfuse_observation_usage_attributes_are_written() {
7674        let usage = YamlLlmTokenUsage {
7675            prompt_tokens: 7,
7676            completion_tokens: 9,
7677            total_tokens: 16,
7678            reasoning_tokens: Some(3),
7679        };
7680        let mut span = CapturingSpan::default();
7681        apply_langfuse_observation_usage_attributes(&mut span, &usage);
7682
7683        assert_eq!(
7684            span.attributes
7685                .get("gen_ai.usage.input_tokens")
7686                .map(String::as_str),
7687            Some("7")
7688        );
7689        assert_eq!(
7690            span.attributes
7691                .get("gen_ai.usage.output_tokens")
7692                .map(String::as_str),
7693            Some("9")
7694        );
7695        assert_eq!(
7696            span.attributes
7697                .get("gen_ai.usage.total_tokens")
7698                .map(String::as_str),
7699            Some("16")
7700        );
7701        assert_eq!(
7702            span.attributes
7703                .get("gen_ai.usage.reasoning_tokens")
7704                .map(String::as_str),
7705            Some("3")
7706        );
7707        assert!(span
7708            .attributes
7709            .contains_key("langfuse.observation.usage_details"));
7710    }
7711
7712    #[tokio::test]
7713    async fn workflow_run_options_use_explicit_trace_id_and_payload_mode() {
7714        let yaml = r#"
7715id: trace-options-test
7716entry_node: classify
7717nodes:
7718  - id: classify
7719    node_type:
7720      llm_call:
7721        model: gpt-4.1
7722    config:
7723      prompt: |
7724        Classify this email into exactly one category:
7725        {{ input.email_text }}
7726"#;
7727
7728        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7729        let options = YamlWorkflowRunOptions {
7730            telemetry: YamlWorkflowTelemetryConfig {
7731                payload_mode: YamlWorkflowPayloadMode::RedactedPayload,
7732                ..YamlWorkflowTelemetryConfig::default()
7733            },
7734            trace: YamlWorkflowTraceOptions {
7735                context: Some(YamlWorkflowTraceContextInput {
7736                    trace_id: Some("trace-fixed-123".to_string()),
7737                    traceparent: Some("00-trace-fixed-123-span-1-01".to_string()),
7738                    ..YamlWorkflowTraceContextInput::default()
7739                }),
7740                tenant: YamlWorkflowTraceTenantContext {
7741                    conversation_id: Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42".to_string()),
7742                    ..YamlWorkflowTraceTenantContext::default()
7743                },
7744            },
7745            model: None,
7746        };
7747
7748        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7749            &workflow,
7750            &json!({"email_text":"hello"}),
7751            &MockExecutor,
7752            None,
7753            None,
7754            &options,
7755        )
7756        .await
7757        .expect("workflow should execute");
7758
7759        assert_eq!(output.trace_id.as_deref(), Some("trace-fixed-123"));
7760        assert_eq!(
7761            output
7762                .metadata
7763                .as_ref()
7764                .and_then(|value| value.get("telemetry"))
7765                .and_then(|telemetry| telemetry.get("payload_mode"))
7766                .and_then(Value::as_str),
7767            Some("redacted_payload")
7768        );
7769        assert_eq!(
7770            output
7771                .metadata
7772                .as_ref()
7773                .and_then(|value| value.get("trace"))
7774                .and_then(|trace| trace.get("tenant"))
7775                .and_then(|tenant| tenant.get("conversation_id"))
7776                .and_then(Value::as_str),
7777            Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42")
7778        );
7779    }
7780
7781    #[test]
7782    fn streamed_payload_parser_extracts_last_json_object() {
7783        let raw = r#"{"state":"missing_scenario","reason":"ok"}
7784
7785extra reasoning text
7786
7787{"state":"ready","reason":"final"}"#;
7788
7789        let resolved = parse_streamed_structured_payload(raw, false)
7790            .expect("parser should extract final JSON object");
7791        assert_eq!(resolved.payload["state"], "ready");
7792        assert!(resolved.heal_confidence.is_none());
7793    }
7794
7795    #[test]
7796    fn streamed_payload_parser_handles_unbalanced_reasoning_before_json() {
7797        let raw = "reasoning text with unmatched { braces and thoughts\n{\"state\":\"ready\",\"reason\":\"final\"}";
7798
7799        let resolved = parse_streamed_structured_payload(raw, false)
7800            .expect("parser should recover final structured JSON object");
7801        assert_eq!(resolved.payload["state"], "ready");
7802    }
7803
7804    #[test]
7805    fn streamed_payload_parser_handles_markdown_with_heal() {
7806        let raw = r#"Some preface
7807```json
7808{
7809  "state": "missing_scenario",
7810  "reason": "Need more details"
7811}
7812```
7813Some trailing explanation"#;
7814
7815        let resolved = parse_streamed_structured_payload(raw, true)
7816            .expect("heal path should parse JSON block");
7817        assert_eq!(resolved.payload["state"], "missing_scenario");
7818        assert!(resolved.heal_confidence.is_some());
7819    }
7820
7821    #[test]
7822    fn streamed_payload_parser_errors_when_no_json_candidate_exists() {
7823        let raw = "No JSON in this streamed output";
7824        let error = parse_streamed_structured_payload(raw, false)
7825            .expect_err("strict stream parse should fail without JSON candidate");
7826        assert!(error.contains("no JSON object candidate found"));
7827    }
7828
7829    #[test]
7830    fn include_raw_stream_debug_events_defaults_to_false() {
7831        let _guard = stream_debug_env_lock()
7832            .lock()
7833            .expect("stream debug env lock should not be poisoned");
7834        std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
7835        assert!(!include_raw_stream_debug_events());
7836    }
7837
7838    #[test]
7839    fn include_raw_stream_debug_events_accepts_truthy_values() {
7840        let _guard = stream_debug_env_lock()
7841            .lock()
7842            .expect("stream debug env lock should not be poisoned");
7843        std::env::set_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW", "true");
7844        assert!(include_raw_stream_debug_events());
7845        std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
7846    }
7847
7848    #[test]
7849    fn structured_json_delta_filter_strips_reasoning_prefix_and_suffix() {
7850        let mut filter = StructuredJsonDeltaFilter::default();
7851        let chunks = vec![
7852            "I will think first... ",
7853            "{\"state\":\"missing_scenario\",",
7854            "\"reason\":\"Need more details\"}",
7855            " additional commentary",
7856        ];
7857
7858        let filtered = chunks
7859            .into_iter()
7860            .filter_map(|chunk| filter.split(chunk).0)
7861            .collect::<String>();
7862
7863        assert_eq!(
7864            filtered,
7865            "{\"state\":\"missing_scenario\",\"reason\":\"Need more details\"}"
7866        );
7867    }
7868
7869    #[test]
7870    fn structured_json_delta_filter_handles_braces_inside_strings() {
7871        let mut filter = StructuredJsonDeltaFilter::default();
7872        let chunks = vec![
7873            "preface ",
7874            "{\"reason\":\"brace } in text\",\"state\":\"ok\"}",
7875            " trailing",
7876        ];
7877
7878        let filtered = chunks
7879            .into_iter()
7880            .filter_map(|chunk| filter.split(chunk).0)
7881            .collect::<String>();
7882
7883        assert_eq!(
7884            filtered,
7885            "{\"reason\":\"brace } in text\",\"state\":\"ok\"}"
7886        );
7887    }
7888
7889    #[test]
7890    fn render_json_object_as_text_converts_top_level_fields() {
7891        let rendered =
7892            render_json_object_as_text(r#"{"question":"q","confidence":0.8,"nested":{"a":1}}"#);
7893        let lines: std::collections::HashSet<&str> = rendered.lines().collect();
7894
7895        assert_eq!(lines.len(), 3);
7896        assert!(lines.contains("question: q"));
7897        assert!(lines.contains("confidence: 0.8"));
7898        assert!(lines.contains("nested: {\"a\":1}"));
7899    }
7900
7901    #[test]
7902    fn stream_json_as_text_formatter_emits_once_when_complete() {
7903        let mut formatter = StreamJsonAsTextFormatter::default();
7904        formatter.push("{\"question\":\"hello\"}");
7905
7906        let first = formatter.emit_if_ready(true);
7907        let second = formatter.emit_if_ready(true);
7908
7909        assert_eq!(first, Some("question: hello".to_string()));
7910        assert_eq!(second, None);
7911    }
7912
7913    #[test]
7914    fn rewrite_yaml_condition_preserves_output_prefix_in_field_names() {
7915        let expr = "$.nodes.classify.output.output_total == 1";
7916        let rewritten = rewrite_yaml_condition_to_ir(expr);
7917        assert_eq!(rewritten, "$.node_outputs.classify.output_total == 1");
7918    }
7919
7920    #[tokio::test]
7921    async fn validates_workflow_input_before_ir_runtime_path() {
7922        let yaml = r#"
7923id: chat-workflow
7924entry_node: classify
7925nodes:
7926  - id: classify
7927    node_type:
7928      llm_call:
7929        model: gpt-4.1
7930    config:
7931      prompt: |
7932        classify
7933"#;
7934
7935        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7936        let err = run_workflow_yaml(&workflow, &json!("not-an-object"), &MockExecutor)
7937            .await
7938            .expect_err("non-object input should fail before execution");
7939
7940        assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
7941    }
7942
7943    #[test]
7944    fn interpolate_template_supports_dollar_prefixed_paths() {
7945        let context = json!({
7946            "input": {
7947                "email_text": "hello"
7948            }
7949        });
7950
7951        let rendered = interpolate_template("value={{ $.input.email_text }}", &context);
7952        assert_eq!(rendered, "value=hello");
7953    }
7954}