Skip to main content

simple_agents_workflow/
yaml_runner.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::Path;
3use std::time::Instant;
4
5use async_trait::async_trait;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use simple_agent_type::message::{Message, Role};
10use simple_agent_type::request::CompletionRequest;
11use simple_agent_type::response::FinishReason;
12use simple_agent_type::tool::{
13    ToolCall, ToolChoice, ToolChoiceFunction, ToolChoiceMode, ToolChoiceTool, ToolDefinition,
14    ToolFunction, ToolType,
15};
16use simple_agents_core::{
17    CompletionMode, CompletionOptions, CompletionOutcome, SimpleAgentsClient,
18};
19use simple_agents_healing::JsonishParser;
20use thiserror::Error;
21
22use crate::ir::{Node, NodeKind, RouterRoute, WorkflowDefinition, WORKFLOW_IR_V0};
23use crate::observability::tracing::{
24    flush_workflow_tracer, workflow_tracer, SpanKind, TraceContext, WorkflowSpan,
25};
26use crate::runtime::{
27    LlmExecutionError, LlmExecutionInput, LlmExecutionOutput, LlmExecutor, ToolExecutionError,
28    ToolExecutionInput, ToolExecutor, WorkflowRuntime, WorkflowRuntimeError,
29    WorkflowRuntimeOptions,
30};
31use crate::visualize::workflow_to_mermaid;
32
33const YAML_START_NODE_ID: &str = "__yaml_start";
34const YAML_LLM_TOOL_ID: &str = "__yaml_llm_call";
35
36static TRACE_ID_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
37
38#[derive(Debug, Clone, PartialEq, Serialize)]
39pub struct YamlStepTiming {
40    pub node_id: String,
41    pub node_kind: String,
42    pub elapsed_ms: u128,
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub prompt_tokens: Option<u32>,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub completion_tokens: Option<u32>,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub total_tokens: Option<u32>,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub thinking_tokens: Option<u32>,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub tokens_per_second: Option<f64>,
53}
54
55#[derive(Debug, Clone, PartialEq, Serialize)]
56pub struct YamlLlmNodeMetrics {
57    pub elapsed_ms: u128,
58    pub prompt_tokens: u32,
59    pub completion_tokens: u32,
60    pub total_tokens: u32,
61    #[serde(skip_serializing_if = "Option::is_none")]
62    pub thinking_tokens: Option<u32>,
63    pub tokens_per_second: f64,
64}
65
66#[derive(Debug, Clone, PartialEq, Serialize)]
67pub struct YamlWorkflowRunOutput {
68    pub workflow_id: String,
69    pub entry_node: String,
70    pub email_text: String,
71    pub trace: Vec<String>,
72    pub outputs: BTreeMap<String, Value>,
73    pub terminal_node: String,
74    pub terminal_output: Option<Value>,
75    pub step_timings: Vec<YamlStepTiming>,
76    pub llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics>,
77    pub llm_node_models: BTreeMap<String, String>,
78    pub total_elapsed_ms: u128,
79    #[serde(skip_serializing_if = "Option::is_none")]
80    pub ttft_ms: Option<u128>,
81    pub total_input_tokens: u64,
82    pub total_output_tokens: u64,
83    pub total_tokens: u64,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub total_thinking_tokens: Option<u64>,
86    pub tokens_per_second: f64,
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub trace_id: Option<String>,
89    #[serde(skip_serializing_if = "Option::is_none")]
90    pub metadata: Option<Value>,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
94#[serde(rename_all = "snake_case")]
95pub enum YamlWorkflowPayloadMode {
96    #[default]
97    FullPayload,
98    RedactedPayload,
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
102#[serde(rename_all = "snake_case")]
103pub enum YamlToolTraceMode {
104    #[default]
105    Full,
106    Redacted,
107    Off,
108}
109
110#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
111pub struct YamlWorkflowTraceContextInput {
112    #[serde(default)]
113    pub trace_id: Option<String>,
114    #[serde(default)]
115    pub span_id: Option<String>,
116    #[serde(default)]
117    pub parent_span_id: Option<String>,
118    #[serde(default)]
119    pub traceparent: Option<String>,
120    #[serde(default)]
121    pub tracestate: Option<String>,
122    #[serde(default)]
123    pub baggage: BTreeMap<String, String>,
124}
125
126#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
127pub struct YamlWorkflowTraceTenantContext {
128    #[serde(default)]
129    pub workspace_id: Option<String>,
130    #[serde(default)]
131    pub user_id: Option<String>,
132    #[serde(default)]
133    pub conversation_id: Option<String>,
134    #[serde(default)]
135    pub request_id: Option<String>,
136    #[serde(default)]
137    pub run_id: Option<String>,
138}
139
140#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
141pub struct YamlWorkflowTelemetryConfig {
142    #[serde(default = "default_true")]
143    pub enabled: bool,
144    #[serde(default = "default_true")]
145    pub nerdstats: bool,
146    #[serde(default = "default_sample_rate")]
147    pub sample_rate: f32,
148    #[serde(default)]
149    pub payload_mode: YamlWorkflowPayloadMode,
150    #[serde(default = "default_retention_days")]
151    pub retention_days: u32,
152    #[serde(default = "default_true")]
153    pub multi_tenant: bool,
154    #[serde(default)]
155    pub tool_trace_mode: YamlToolTraceMode,
156}
157
158impl Default for YamlWorkflowTelemetryConfig {
159    fn default() -> Self {
160        Self {
161            enabled: true,
162            nerdstats: true,
163            sample_rate: 1.0,
164            payload_mode: YamlWorkflowPayloadMode::FullPayload,
165            retention_days: 30,
166            multi_tenant: true,
167            tool_trace_mode: YamlToolTraceMode::Full,
168        }
169    }
170}
171
172#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
173pub struct YamlWorkflowTraceOptions {
174    #[serde(default)]
175    pub context: Option<YamlWorkflowTraceContextInput>,
176    #[serde(default)]
177    pub tenant: YamlWorkflowTraceTenantContext,
178}
179
180#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
181pub struct YamlWorkflowRunOptions {
182    #[serde(default)]
183    pub telemetry: YamlWorkflowTelemetryConfig,
184    #[serde(default)]
185    pub trace: YamlWorkflowTraceOptions,
186    #[serde(default)]
187    pub model: Option<String>,
188}
189
190#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
191pub struct YamlLlmTokenUsage {
192    pub prompt_tokens: u32,
193    pub completion_tokens: u32,
194    pub total_tokens: u32,
195    pub thinking_tokens: Option<u32>,
196}
197
198#[derive(Debug, Clone, PartialEq, Serialize)]
199pub struct YamlLlmExecutionResult {
200    pub payload: Value,
201    pub usage: Option<YamlLlmTokenUsage>,
202    pub ttft_ms: Option<u128>,
203    pub tool_calls: Vec<YamlToolCallTrace>,
204}
205
206#[derive(Debug, Clone, PartialEq, Serialize)]
207pub struct YamlToolCallTrace {
208    pub id: String,
209    pub name: String,
210    pub arguments: Value,
211    pub output: Option<Value>,
212    pub status: String,
213    pub elapsed_ms: u128,
214    pub error: Option<String>,
215}
216
217#[derive(Debug, Clone, Default)]
218struct YamlTokenTotals {
219    input_tokens: u64,
220    output_tokens: u64,
221    total_tokens: u64,
222    thinking_tokens: Option<u64>,
223}
224
225impl YamlTokenTotals {
226    fn add_usage(&mut self, usage: &YamlLlmTokenUsage) {
227        self.input_tokens += u64::from(usage.prompt_tokens);
228        self.output_tokens += u64::from(usage.completion_tokens);
229        self.total_tokens += u64::from(usage.total_tokens);
230
231        if let Some(thinking_tokens) = usage.thinking_tokens {
232            let next = self.thinking_tokens.unwrap_or(0) + u64::from(thinking_tokens);
233            self.thinking_tokens = Some(next);
234        }
235    }
236
237    fn tokens_per_second(&self, elapsed_ms: u128) -> f64 {
238        if elapsed_ms == 0 {
239            return 0.0;
240        }
241        round_two_decimals((self.output_tokens as f64) * 1000.0 / (elapsed_ms as f64))
242    }
243}
244
245fn round_two_decimals(value: f64) -> f64 {
246    (value * 100.0).round() / 100.0
247}
248
249fn completion_tokens_per_second(completion_tokens: u32, elapsed_ms: u128) -> f64 {
250    if elapsed_ms == 0 {
251        return 0.0;
252    }
253    round_two_decimals((completion_tokens as f64) * 1000.0 / (elapsed_ms as f64))
254}
255
256fn resolve_requested_model(run_model_override: Option<&str>, node_model: &str) -> String {
257    run_model_override
258        .and_then(|model| {
259            let trimmed = model.trim();
260            if trimmed.is_empty() {
261                None
262            } else {
263                Some(trimmed.to_string())
264            }
265        })
266        .unwrap_or_else(|| node_model.to_string())
267}
268
269fn default_true() -> bool {
270    true
271}
272
273fn default_sample_rate() -> f32 {
274    1.0
275}
276
277fn default_retention_days() -> u32 {
278    30
279}
280
281fn validate_sample_rate(sample_rate: f32) -> Result<(), YamlWorkflowRunError> {
282    if sample_rate.is_finite() && (0.0..=1.0).contains(&sample_rate) {
283        return Ok(());
284    }
285
286    Err(YamlWorkflowRunError::InvalidInput {
287        message: format!(
288            "telemetry.sample_rate must be between 0.0 and 1.0 inclusive; received {sample_rate}"
289        ),
290    })
291}
292
293fn should_sample_trace(trace_id: &str, sample_rate: f32) -> bool {
294    if sample_rate >= 1.0 {
295        return true;
296    }
297    if sample_rate <= 0.0 {
298        return false;
299    }
300
301    let mut hash: u64 = 0xcbf29ce484222325;
302    for byte in trace_id.as_bytes() {
303        hash ^= u64::from(*byte);
304        hash = hash.wrapping_mul(0x100000001b3);
305    }
306
307    let ratio = (hash as f64) / (u64::MAX as f64);
308    ratio < (sample_rate as f64)
309}
310
311fn trace_id_from_traceparent(traceparent: &str) -> Option<String> {
312    let mut parts = traceparent.split('-');
313    let version = parts.next()?;
314    let trace_id = parts.next()?;
315    let _span_id = parts.next()?;
316    let _flags = parts.next()?;
317    if parts.next().is_some() {
318        return None;
319    }
320
321    if version.len() != 2 || trace_id.len() != 32 {
322        return None;
323    }
324
325    if !version.chars().all(|ch| ch.is_ascii_hexdigit()) {
326        return None;
327    }
328    if !trace_id.chars().all(|ch| ch.is_ascii_hexdigit()) {
329        return None;
330    }
331    if trace_id.chars().all(|ch| ch == '0') {
332        return None;
333    }
334
335    Some(trace_id.to_ascii_lowercase())
336}
337
338#[derive(Debug, Clone, Copy, PartialEq, Eq)]
339enum TraceIdSource {
340    Disabled,
341    ExplicitTraceId,
342    ParentTraceId,
343    Traceparent,
344    ParentTraceparent,
345    Generated,
346}
347
348#[derive(Debug, Clone)]
349struct ResolvedTelemetryContext {
350    trace_id: Option<String>,
351    sampled: bool,
352    trace_id_source: TraceIdSource,
353}
354
355fn resolve_run_trace_id_with_source(
356    options: &YamlWorkflowRunOptions,
357    parent_trace_context: Option<&TraceContext>,
358) -> (Option<String>, TraceIdSource) {
359    if !options.telemetry.enabled {
360        return (None, TraceIdSource::Disabled);
361    }
362
363    if let Some(trace_id) = options
364        .trace
365        .context
366        .as_ref()
367        .and_then(|context| context.trace_id.clone())
368    {
369        return (Some(trace_id), TraceIdSource::ExplicitTraceId);
370    }
371
372    if let Some(trace_id) = parent_trace_context.and_then(|context| context.trace_id.clone()) {
373        return (Some(trace_id), TraceIdSource::ParentTraceId);
374    }
375
376    if let Some(trace_id) = options
377        .trace
378        .context
379        .as_ref()
380        .and_then(|context| context.traceparent.as_deref())
381        .and_then(trace_id_from_traceparent)
382    {
383        return (Some(trace_id), TraceIdSource::Traceparent);
384    }
385
386    if let Some(trace_id) = parent_trace_context
387        .and_then(|context| context.traceparent.as_deref())
388        .and_then(trace_id_from_traceparent)
389    {
390        return (Some(trace_id), TraceIdSource::ParentTraceparent);
391    }
392
393    (Some(generate_trace_id()), TraceIdSource::Generated)
394}
395
396fn resolve_telemetry_context(
397    options: &YamlWorkflowRunOptions,
398    parent_trace_context: Option<&TraceContext>,
399) -> ResolvedTelemetryContext {
400    let (trace_id, trace_id_source) =
401        resolve_run_trace_id_with_source(options, parent_trace_context);
402    let sampled = trace_id
403        .as_deref()
404        .map(|value| should_sample_trace(value, options.telemetry.sample_rate))
405        .unwrap_or(false);
406
407    ResolvedTelemetryContext {
408        trace_id,
409        sampled,
410        trace_id_source,
411    }
412}
413
414fn generate_trace_id() -> String {
415    use std::time::{SystemTime, UNIX_EPOCH};
416
417    let now_nanos = SystemTime::now()
418        .duration_since(UNIX_EPOCH)
419        .map(|duration| duration.as_nanos())
420        .unwrap_or(0);
421    let sequence = u128::from(TRACE_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed));
422    format!("{:032x}", now_nanos ^ sequence)
423}
424
425fn workflow_metadata_with_trace(
426    options: &YamlWorkflowRunOptions,
427    trace_id: &str,
428    sampled: bool,
429    trace_id_source: TraceIdSource,
430) -> Value {
431    json!({
432        "telemetry": {
433            "trace_id": trace_id,
434            "trace_id_source": match trace_id_source {
435                TraceIdSource::Disabled => "disabled",
436                TraceIdSource::ExplicitTraceId => "explicit_trace_id",
437                TraceIdSource::ParentTraceId => "parent_trace_id",
438                TraceIdSource::Traceparent => "traceparent",
439                TraceIdSource::ParentTraceparent => "parent_traceparent",
440                TraceIdSource::Generated => "generated",
441            },
442            "enabled": options.telemetry.enabled,
443            "sampled": sampled,
444            "nerdstats": options.telemetry.nerdstats,
445            "sample_rate": options.telemetry.sample_rate,
446            "payload_mode": match options.telemetry.payload_mode {
447                YamlWorkflowPayloadMode::FullPayload => "full_payload",
448                YamlWorkflowPayloadMode::RedactedPayload => "redacted_payload",
449            },
450            "retention_days": options.telemetry.retention_days,
451            "multi_tenant": options.telemetry.multi_tenant,
452            "tool_trace_mode": match options.telemetry.tool_trace_mode {
453                YamlToolTraceMode::Full => "full",
454                YamlToolTraceMode::Redacted => "redacted",
455                YamlToolTraceMode::Off => "off",
456            },
457        },
458        "trace": {
459            "tenant": {
460                "workspace_id": options.trace.tenant.workspace_id,
461                "user_id": options.trace.tenant.user_id,
462                "conversation_id": options.trace.tenant.conversation_id,
463                "request_id": options.trace.tenant.request_id,
464                "run_id": options.trace.tenant.run_id,
465            }
466        },
467    })
468}
469
470fn apply_trace_tenant_attributes(span: &mut dyn WorkflowSpan, options: &YamlWorkflowRunOptions) {
471    if let Some(workspace_id) = options.trace.tenant.workspace_id.as_deref() {
472        span.set_attribute("tenant.workspace_id", workspace_id);
473    }
474    if let Some(user_id) = options.trace.tenant.user_id.as_deref() {
475        span.set_attribute("tenant.user_id", user_id);
476    }
477    if let Some(conversation_id) = options.trace.tenant.conversation_id.as_deref() {
478        span.set_attribute("tenant.conversation_id", conversation_id);
479    }
480    if let Some(request_id) = options.trace.tenant.request_id.as_deref() {
481        span.set_attribute("tenant.request_id", request_id);
482    }
483    if let Some(run_id) = options.trace.tenant.run_id.as_deref() {
484        span.set_attribute("tenant.run_id", run_id);
485    }
486}
487
488fn workflow_nerdstats(output: &YamlWorkflowRunOutput) -> Value {
489    let llm_nodes_without_usage: Vec<String> = output
490        .step_timings
491        .iter()
492        .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
493        .map(|step| step.node_id.clone())
494        .collect();
495    let token_metrics_available = llm_nodes_without_usage.is_empty();
496    let token_metrics_source = if token_metrics_available {
497        "provider_usage"
498    } else {
499        "provider_stream_usage_unavailable"
500    };
501    let total_input_tokens = if token_metrics_available {
502        json!(output.total_input_tokens)
503    } else {
504        Value::Null
505    };
506    let total_output_tokens = if token_metrics_available {
507        json!(output.total_output_tokens)
508    } else {
509        Value::Null
510    };
511    let total_tokens = if token_metrics_available {
512        json!(output.total_tokens)
513    } else {
514        Value::Null
515    };
516    let total_thinking_tokens = if token_metrics_available {
517        json!(output.total_thinking_tokens)
518    } else {
519        Value::Null
520    };
521    let tokens_per_second = if token_metrics_available {
522        json!(output.tokens_per_second)
523    } else {
524        Value::Null
525    };
526
527    json!({
528        "workflow_id": output.workflow_id,
529        "terminal_node": output.terminal_node,
530        "total_elapsed_ms": output.total_elapsed_ms,
531        "ttft_ms": output.ttft_ms,
532        "step_details": output.step_timings,
533        "llm_node_models": output.llm_node_models,
534        "total_input_tokens": total_input_tokens,
535        "total_output_tokens": total_output_tokens,
536        "total_tokens": total_tokens,
537        "total_thinking_tokens": total_thinking_tokens,
538        "tokens_per_second": tokens_per_second,
539        "trace_id": output.trace_id,
540        "token_metrics_available": token_metrics_available,
541        "token_metrics_source": token_metrics_source,
542        "llm_nodes_without_usage": llm_nodes_without_usage,
543    })
544}
545
546fn payload_for_span(mode: YamlWorkflowPayloadMode, payload: &Value) -> String {
547    match mode {
548        YamlWorkflowPayloadMode::FullPayload => payload.to_string(),
549        YamlWorkflowPayloadMode::RedactedPayload => json!({
550            "redacted": true,
551            "value_type": match payload {
552                Value::Null => "null",
553                Value::Bool(_) => "bool",
554                Value::Number(_) => "number",
555                Value::String(_) => "string",
556                Value::Array(_) => "array",
557                Value::Object(_) => "object",
558            }
559        })
560        .to_string(),
561    }
562}
563
564fn payload_for_tool_trace(mode: YamlToolTraceMode, payload: &Value) -> Value {
565    match mode {
566        YamlToolTraceMode::Full => payload.clone(),
567        YamlToolTraceMode::Redacted => json!({
568            "redacted": true,
569            "value_type": json_type_name(payload),
570        }),
571        YamlToolTraceMode::Off => Value::Null,
572    }
573}
574
575fn validate_json_schema(schema: &Value) -> Result<(), String> {
576    jsonschema::JSONSchema::compile(schema)
577        .map(|_| ())
578        .map_err(|error| format!("invalid JSON schema: {error}"))
579}
580
581fn validate_schema_instance(schema: &Value, instance: &Value) -> Result<(), String> {
582    let validator = jsonschema::JSONSchema::compile(schema)
583        .map_err(|error| format!("invalid JSON schema: {error}"))?;
584    if let Err(errors) = validator.validate(instance) {
585        let message = errors
586            .into_iter()
587            .next()
588            .map(|error| error.to_string())
589            .unwrap_or_else(|| "unknown schema validation error".to_string());
590        return Err(format!("schema validation failed: {message}"));
591    }
592    Ok(())
593}
594
595fn schema_type(schema: &Value) -> Option<&str> {
596    schema.get("type").and_then(Value::as_str)
597}
598
599fn schema_expects_object(schema: &Value) -> bool {
600    schema_type(schema) == Some("object")
601}
602
603fn trace_context_from_options(options: &YamlWorkflowRunOptions) -> Option<TraceContext> {
604    options.trace.context.as_ref().map(|input| TraceContext {
605        trace_id: input.trace_id.clone(),
606        span_id: input.span_id.clone(),
607        parent_span_id: input.parent_span_id.clone(),
608        traceparent: input.traceparent.clone(),
609        tracestate: input.tracestate.clone(),
610        baggage: input.baggage.clone(),
611    })
612}
613
614fn merged_trace_context_for_worker(
615    span_context: Option<&TraceContext>,
616    resolved_trace_id: Option<&str>,
617    options: &YamlWorkflowRunOptions,
618) -> TraceContext {
619    let input_context = options.trace.context.as_ref();
620    let baggage = if let Some(context) = span_context {
621        if !context.baggage.is_empty() {
622            context.baggage.clone()
623        } else {
624            input_context
625                .map(|value| value.baggage.clone())
626                .unwrap_or_default()
627        }
628    } else {
629        input_context
630            .map(|value| value.baggage.clone())
631            .unwrap_or_default()
632    };
633
634    TraceContext {
635        trace_id: span_context
636            .and_then(|context| context.trace_id.clone())
637            .or_else(|| resolved_trace_id.map(|value| value.to_string()))
638            .or_else(|| input_context.and_then(|context| context.trace_id.clone())),
639        span_id: span_context
640            .and_then(|context| context.span_id.clone())
641            .or_else(|| input_context.and_then(|context| context.span_id.clone())),
642        parent_span_id: span_context
643            .and_then(|context| context.parent_span_id.clone())
644            .or_else(|| input_context.and_then(|context| context.parent_span_id.clone())),
645        traceparent: span_context
646            .and_then(|context| context.traceparent.clone())
647            .or_else(|| input_context.and_then(|context| context.traceparent.clone())),
648        tracestate: span_context
649            .and_then(|context| context.tracestate.clone())
650            .or_else(|| input_context.and_then(|context| context.tracestate.clone())),
651        baggage,
652    }
653}
654
655fn custom_worker_context_with_trace(
656    context: &Value,
657    trace_context: &TraceContext,
658    tenant_context: &YamlWorkflowTraceTenantContext,
659) -> Value {
660    let mut context_with_trace = context.clone();
661    let Some(root) = context_with_trace.as_object_mut() else {
662        return context_with_trace;
663    };
664
665    root.insert(
666        "trace".to_string(),
667        json!({
668            "context": {
669                "trace_id": trace_context.trace_id,
670                "span_id": trace_context.span_id,
671                "parent_span_id": trace_context.parent_span_id,
672                "traceparent": trace_context.traceparent,
673                "tracestate": trace_context.tracestate,
674                "baggage": trace_context.baggage,
675            },
676            "tenant": {
677                "workspace_id": tenant_context.workspace_id,
678                "user_id": tenant_context.user_id,
679                "conversation_id": tenant_context.conversation_id,
680                "request_id": tenant_context.request_id,
681                "run_id": tenant_context.run_id,
682            }
683        }),
684    );
685
686    context_with_trace
687}
688
689fn include_raw_stream_debug_events() -> bool {
690    match std::env::var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW") {
691        Ok(value) => {
692            let normalized = value.trim().to_ascii_lowercase();
693            normalized == "1" || normalized == "true" || normalized == "yes" || normalized == "on"
694        }
695        Err(_) => false,
696    }
697}
698
699#[derive(Debug)]
700struct StreamedPayloadResolution {
701    payload: Value,
702    heal_confidence: Option<f32>,
703}
704
705#[derive(Debug, Default)]
706struct StreamJsonAsTextFormatter {
707    raw_json: String,
708    emitted: bool,
709}
710
711impl StreamJsonAsTextFormatter {
712    fn push(&mut self, chunk: &str) {
713        self.raw_json.push_str(chunk);
714    }
715
716    fn emit_if_ready(&mut self, complete: bool) -> Option<String> {
717        if self.emitted || !complete {
718            return None;
719        }
720        self.emitted = true;
721        Some(render_json_object_as_text(self.raw_json.as_str()))
722    }
723}
724
725fn render_json_object_as_text(raw_json: &str) -> String {
726    let value = match serde_json::from_str::<Value>(raw_json) {
727        Ok(value) => value,
728        Err(_) => return raw_json.to_string(),
729    };
730    let Some(object) = value.as_object() else {
731        return raw_json.to_string();
732    };
733
734    let mut lines = Vec::with_capacity(object.len());
735    for (key, value) in object {
736        let rendered = match value {
737            Value::String(text) => text.clone(),
738            _ => value.to_string(),
739        };
740        lines.push(format!("{key}: {rendered}"));
741    }
742    lines.join("\n")
743}
744
745#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
746#[serde(rename_all = "snake_case")]
747pub enum YamlWorkflowTokenKind {
748    Output,
749    Thinking,
750}
751
752#[derive(Debug, Default)]
753struct StructuredJsonDeltaFilter {
754    started: bool,
755    completed: bool,
756    depth: u32,
757    in_string: bool,
758    escape: bool,
759}
760
761impl StructuredJsonDeltaFilter {
762    fn split(&mut self, delta: &str) -> (Option<String>, Option<String>) {
763        if delta.is_empty() {
764            return (None, None);
765        }
766
767        let mut output = String::new();
768        let mut thinking = String::new();
769
770        for ch in delta.chars() {
771            if self.completed {
772                thinking.push(ch);
773                continue;
774            }
775
776            if !self.started {
777                if ch != '{' {
778                    thinking.push(ch);
779                    continue;
780                }
781                self.started = true;
782                self.depth = 1;
783                output.push(ch);
784                continue;
785            }
786
787            output.push(ch);
788            if self.in_string {
789                if self.escape {
790                    self.escape = false;
791                    continue;
792                }
793                if ch == '\\' {
794                    self.escape = true;
795                    continue;
796                }
797                if ch == '"' {
798                    self.in_string = false;
799                }
800                continue;
801            }
802
803            match ch {
804                '"' => self.in_string = true,
805                '{' => self.depth = self.depth.saturating_add(1),
806                '}' => {
807                    if self.depth > 0 {
808                        self.depth -= 1;
809                    }
810                    if self.depth == 0 {
811                        self.completed = true;
812                    }
813                }
814                _ => {}
815            }
816        }
817
818        let output = if output.is_empty() {
819            None
820        } else {
821            Some(output)
822        };
823        let thinking = if thinking.is_empty() {
824            None
825        } else {
826            Some(thinking)
827        };
828
829        (output, thinking)
830    }
831
832    fn completed(&self) -> bool {
833        self.completed
834    }
835}
836
837fn extract_last_fenced_json_block(raw: &str) -> Option<&str> {
838    let start = raw.rfind("```json")?;
839    let remainder = &raw[start + "```json".len()..];
840    let end = remainder.find("```")?;
841    let candidate = remainder[..end].trim();
842    if candidate.is_empty() {
843        return None;
844    }
845    Some(candidate)
846}
847
848fn extract_balanced_object_from(raw: &str, start_index: usize) -> Option<&str> {
849    let mut depth = 0u32;
850    let mut in_string = false;
851    let mut escape = false;
852
853    for (relative_index, ch) in raw[start_index..].char_indices() {
854        if in_string {
855            if escape {
856                escape = false;
857                continue;
858            }
859            if ch == '\\' {
860                escape = true;
861                continue;
862            }
863            if ch == '"' {
864                in_string = false;
865            }
866            continue;
867        }
868
869        match ch {
870            '"' => in_string = true,
871            '{' => depth = depth.saturating_add(1),
872            '}' => {
873                if depth == 0 {
874                    return None;
875                }
876                depth -= 1;
877                if depth == 0 {
878                    let end_index = start_index + relative_index + ch.len_utf8();
879                    return Some(raw[start_index..end_index].trim());
880                }
881            }
882            _ => {}
883        }
884    }
885
886    None
887}
888
889fn extract_last_parsable_object(raw: &str) -> Option<&str> {
890    let starts: Vec<usize> = raw
891        .char_indices()
892        .filter_map(|(index, ch)| if ch == '{' { Some(index) } else { None })
893        .collect();
894
895    for start in starts.into_iter().rev() {
896        let Some(candidate) = extract_balanced_object_from(raw, start) else {
897            continue;
898        };
899        if serde_json::from_str::<Value>(candidate).is_ok() {
900            return Some(candidate);
901        }
902    }
903
904    None
905}
906
907fn resolve_structured_json_candidate(raw: &str) -> Option<&str> {
908    extract_last_fenced_json_block(raw).or_else(|| extract_last_parsable_object(raw))
909}
910
911fn parse_streamed_structured_payload(
912    raw: &str,
913    heal: bool,
914) -> Result<StreamedPayloadResolution, String> {
915    if !heal {
916        if let Ok(payload) = serde_json::from_str::<Value>(raw) {
917            return Ok(StreamedPayloadResolution {
918                payload,
919                heal_confidence: None,
920            });
921        }
922
923        let candidate = resolve_structured_json_candidate(raw).ok_or_else(|| {
924            "failed to parse streamed structured completion JSON: no JSON object candidate found"
925                .to_string()
926        })?;
927        let payload = serde_json::from_str::<Value>(candidate).map_err(|error| {
928            format!(
929                "failed to parse streamed structured completion JSON: {error}; candidate={candidate}"
930            )
931        })?;
932        return Ok(StreamedPayloadResolution {
933            payload,
934            heal_confidence: None,
935        });
936    }
937
938    let candidate = resolve_structured_json_candidate(raw).unwrap_or(raw);
939    let parser = JsonishParser::new();
940    let healed = parser
941        .parse(candidate)
942        .map_err(|error| format!("failed to heal streamed structured completion JSON: {error}"))?;
943
944    Ok(StreamedPayloadResolution {
945        payload: healed.value,
946        heal_confidence: Some(healed.confidence),
947    })
948}
949
950#[derive(Debug, Clone, PartialEq, Serialize)]
951pub struct YamlWorkflowEvent {
952    pub event_type: String,
953    pub node_id: Option<String>,
954    pub step_id: Option<String>,
955    pub node_kind: Option<String>,
956    pub streamable: Option<bool>,
957    pub message: Option<String>,
958    pub delta: Option<String>,
959    pub token_kind: Option<YamlWorkflowTokenKind>,
960    pub is_terminal_node_token: Option<bool>,
961    pub elapsed_ms: Option<u128>,
962    pub metadata: Option<Value>,
963}
964
965pub type WorkflowMessageRole = Role;
966
967#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
968pub struct WorkflowMessage {
969    pub role: WorkflowMessageRole,
970    pub content: String,
971    #[serde(default)]
972    pub name: Option<String>,
973    #[serde(default, alias = "toolCallId")]
974    pub tool_call_id: Option<String>,
975}
976
977#[derive(Debug, Clone, PartialEq, Serialize)]
978pub struct YamlTemplateBinding {
979    pub index: usize,
980    pub expression: String,
981    pub source_path: String,
982    pub resolved: Value,
983    pub resolved_type: String,
984    pub missing: bool,
985}
986
987#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
988pub enum YamlWorkflowDiagnosticSeverity {
989    Error,
990    Warning,
991}
992
993#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
994pub struct YamlWorkflowDiagnostic {
995    pub node_id: Option<String>,
996    pub code: String,
997    pub severity: YamlWorkflowDiagnosticSeverity,
998    pub message: String,
999}
1000
1001#[derive(Debug, Error)]
1002pub enum YamlWorkflowRunError {
1003    #[error("failed to read workflow yaml '{path}': {source}")]
1004    Read {
1005        path: String,
1006        source: std::io::Error,
1007    },
1008    #[error("failed to parse workflow yaml '{path}': {source}")]
1009    Parse {
1010        path: String,
1011        source: serde_yaml::Error,
1012    },
1013    #[error("workflow '{workflow_id}' has no nodes")]
1014    EmptyNodes { workflow_id: String },
1015    #[error("entry node '{entry_node}' does not exist")]
1016    MissingEntry { entry_node: String },
1017    #[error("unknown node id '{node_id}'")]
1018    MissingNode { node_id: String },
1019    #[error("unsupported node type in '{node_id}'")]
1020    UnsupportedNodeType { node_id: String },
1021    #[error("unsupported switch condition format: {condition}")]
1022    UnsupportedCondition { condition: String },
1023    #[error("switch node '{node_id}' has no valid next target")]
1024    InvalidSwitchTarget { node_id: String },
1025    #[error("llm returned non-object payload for node '{node_id}'")]
1026    LlmPayloadNotObject { node_id: String },
1027    #[error("custom worker handler '{handler}' is not supported")]
1028    UnsupportedCustomHandler { handler: String },
1029    #[error("llm execution failed for node '{node_id}': {message}")]
1030    Llm { node_id: String, message: String },
1031    #[error("custom worker execution failed for node '{node_id}': {message}")]
1032    CustomWorker { node_id: String, message: String },
1033    #[error("workflow validation failed with {diagnostics_count} error(s)")]
1034    Validation {
1035        diagnostics_count: usize,
1036        diagnostics: Vec<YamlWorkflowDiagnostic>,
1037    },
1038    #[error("invalid workflow input: {message}")]
1039    InvalidInput { message: String },
1040    #[error("ir runtime execution failed: {message}")]
1041    IrRuntime { message: String },
1042    #[error("workflow event stream cancelled: {message}")]
1043    EventSinkCancelled { message: String },
1044}
1045
1046pub trait YamlWorkflowEventSink: Send + Sync {
1047    fn emit(&self, event: &YamlWorkflowEvent);
1048
1049    fn is_cancelled(&self) -> bool {
1050        false
1051    }
1052}
1053
1054pub struct NoopYamlWorkflowEventSink;
1055
1056impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
1057    fn emit(&self, _event: &YamlWorkflowEvent) {}
1058}
1059
1060fn workflow_event_sink_cancelled_message() -> &'static str {
1061    "workflow event callback cancelled"
1062}
1063
1064fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
1065    event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
1066}
1067
1068#[derive(Debug, Clone, PartialEq, Eq, Error)]
1069pub enum YamlToIrError {
1070    #[error("entry node '{entry_node}' does not exist")]
1071    MissingEntry { entry_node: String },
1072    #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
1073    MultipleOutgoingEdge { node_id: String },
1074    #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
1075    UnsupportedNode { node_id: String, reason: String },
1076}
1077
1078/// Render a YAML workflow graph as Mermaid flowchart.
1079pub fn yaml_workflow_to_mermaid(workflow: &YamlWorkflow) -> String {
1080    if let Ok(ir) = yaml_workflow_to_ir(workflow) {
1081        return workflow_to_mermaid(&ir);
1082    }
1083
1084    yaml_workflow_to_mermaid_fallback(workflow)
1085}
1086
1087fn yaml_workflow_to_mermaid_fallback(workflow: &YamlWorkflow) -> String {
1088    let mut lines = Vec::new();
1089    lines.push("flowchart TD".to_string());
1090    let mut tool_node_ids: Vec<String> = Vec::new();
1091
1092    for node in &workflow.nodes {
1093        let label = format!("{}\\n({})", node.id, node.kind_name());
1094        lines.push(format!(
1095            "  {}[\"{}\"]",
1096            sanitize_mermaid_id(&node.id),
1097            escape_mermaid_label(label.as_str()),
1098        ));
1099
1100        if let Some(llm) = node.node_type.llm_call.as_ref() {
1101            for (idx, tool_name) in llm_tool_names(llm).into_iter().enumerate() {
1102                let tool_id = sanitize_mermaid_id(format!("{}__tool_{}", node.id, idx).as_str());
1103                lines.push(format!(
1104                    "  {}([\"{}\"])",
1105                    tool_id,
1106                    escape_mermaid_label(format!("tool: {tool_name}").as_str())
1107                ));
1108                lines.push(format!(
1109                    "  {} -.-> {}",
1110                    sanitize_mermaid_id(&node.id),
1111                    tool_id
1112                ));
1113                tool_node_ids.push(tool_id);
1114            }
1115        }
1116    }
1117
1118    let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1119
1120    for edge in &workflow.edges {
1121        emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1122    }
1123
1124    for node in &workflow.nodes {
1125        if let Some(switch) = node.node_type.switch.as_ref() {
1126            for branch in &switch.branches {
1127                emitted.insert((
1128                    node.id.clone(),
1129                    branch.condition.clone(),
1130                    branch.target.clone(),
1131                ));
1132            }
1133            emitted.insert((
1134                node.id.clone(),
1135                "default".to_string(),
1136                switch.default.clone(),
1137            ));
1138        }
1139    }
1140
1141    let mut edges = emitted.into_iter().collect::<Vec<_>>();
1142    edges.sort();
1143
1144    for (from, label, to) in edges {
1145        if label.is_empty() {
1146            lines.push(format!(
1147                "  {} --> {}",
1148                sanitize_mermaid_id(&from),
1149                sanitize_mermaid_id(&to)
1150            ));
1151        } else {
1152            lines.push(format!(
1153                "  {} -- \"{}\" --> {}",
1154                sanitize_mermaid_id(&from),
1155                escape_mermaid_label(&label),
1156                sanitize_mermaid_id(&to)
1157            ));
1158        }
1159    }
1160
1161    if !tool_node_ids.is_empty() {
1162        lines.push("  classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1163        lines.push(format!("  class {} toolNode;", tool_node_ids.join(",")));
1164    }
1165
1166    lines.join("\n")
1167}
1168
1169fn llm_tool_names(llm: &YamlLlmCall) -> Vec<String> {
1170    llm.tools
1171        .iter()
1172        .map(|tool| match tool {
1173            YamlToolDeclaration::OpenAi(openai) => openai.function.name.clone(),
1174            YamlToolDeclaration::Simplified(simple) => simple.name.clone(),
1175        })
1176        .collect()
1177}
1178
1179/// Load a YAML workflow file and render it as Mermaid flowchart.
1180pub fn yaml_workflow_file_to_mermaid(workflow_path: &Path) -> Result<String, YamlWorkflowRunError> {
1181    let contents =
1182        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1183            path: workflow_path.display().to_string(),
1184            source,
1185        })?;
1186
1187    let workflow: YamlWorkflow =
1188        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1189            path: workflow_path.display().to_string(),
1190            source,
1191        })?;
1192
1193    let referenced_subgraphs = discover_referenced_subgraphs(workflow_path, &workflow)?;
1194    if referenced_subgraphs.is_empty() {
1195        return Ok(yaml_workflow_to_mermaid(&workflow));
1196    }
1197
1198    Ok(yaml_workflow_to_mermaid_with_subgraphs(
1199        &workflow,
1200        &referenced_subgraphs,
1201    ))
1202}
1203
1204#[derive(Debug, Clone)]
1205struct MermaidSubgraphWorkflow {
1206    alias: String,
1207    workflow: YamlWorkflow,
1208}
1209
1210#[derive(Debug, Default)]
1211struct MermaidBlockRender {
1212    lines: Vec<String>,
1213    tool_node_ids: Vec<String>,
1214    run_workflow_tool_node_ids: Vec<String>,
1215    entry_node_id: String,
1216}
1217
1218fn yaml_workflow_to_mermaid_with_subgraphs(
1219    workflow: &YamlWorkflow,
1220    subgraphs: &[MermaidSubgraphWorkflow],
1221) -> String {
1222    let mut lines = vec!["flowchart TD".to_string()];
1223
1224    let main_block = render_mermaid_block(workflow, "main");
1225    lines.push(format!(
1226        "  subgraph main_graph[\"Main: {}\"]",
1227        escape_mermaid_label(&workflow.id)
1228    ));
1229    for line in &main_block.lines {
1230        lines.push(format!("    {line}"));
1231    }
1232    lines.push("  end".to_string());
1233
1234    let mut all_tool_nodes = main_block.tool_node_ids.clone();
1235
1236    for (index, subgraph) in subgraphs.iter().enumerate() {
1237        let block_id = format!("subgraph_{}", index + 1);
1238        let block = render_mermaid_block(&subgraph.workflow, &block_id);
1239        lines.push(format!(
1240            "  subgraph {}[\"Subgraph: {}\"]",
1241            sanitize_mermaid_id(&format!("{}_cluster", block_id)),
1242            escape_mermaid_label(&subgraph.alias)
1243        ));
1244        for line in &block.lines {
1245            lines.push(format!("    {line}"));
1246        }
1247        lines.push("  end".to_string());
1248
1249        for tool_node in &main_block.run_workflow_tool_node_ids {
1250            lines.push(format!(
1251                "  {} -. \"{}\" .-> {}",
1252                tool_node,
1253                escape_mermaid_label(&format!("calls {}", subgraph.alias)),
1254                block.entry_node_id
1255            ));
1256        }
1257
1258        all_tool_nodes.extend(block.tool_node_ids);
1259    }
1260
1261    if !all_tool_nodes.is_empty() {
1262        lines.push("  classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1263        lines.push(format!("  class {} toolNode;", all_tool_nodes.join(",")));
1264    }
1265
1266    lines.join("\n")
1267}
1268
1269fn render_mermaid_block(workflow: &YamlWorkflow, prefix: &str) -> MermaidBlockRender {
1270    let mut block = MermaidBlockRender {
1271        entry_node_id: prefixed_mermaid_id(prefix, &workflow.entry_node),
1272        ..Default::default()
1273    };
1274
1275    for node in &workflow.nodes {
1276        let node_id = prefixed_mermaid_id(prefix, &node.id);
1277        let label = format!("{}\\n({})", node.id, node.kind_name());
1278        block
1279            .lines
1280            .push(format!("{}[\"{}\"]", node_id, escape_mermaid_label(&label)));
1281
1282        if let Some(llm) = node.node_type.llm_call.as_ref() {
1283            for (idx, tool) in llm.tools.iter().enumerate() {
1284                let tool_name = tool_declaration_name(tool);
1285                let tool_id = prefixed_mermaid_id(prefix, &format!("{}__tool_{}", node.id, idx));
1286                block.lines.push(format!(
1287                    "{}([\"{}\"])",
1288                    tool_id,
1289                    escape_mermaid_label(format!("tool: {tool_name}").as_str())
1290                ));
1291                block.lines.push(format!("{} -.-> {}", node_id, tool_id));
1292                block.tool_node_ids.push(tool_id.clone());
1293
1294                if tool_name == "run_workflow_graph" {
1295                    block.run_workflow_tool_node_ids.push(tool_id);
1296                }
1297            }
1298        }
1299    }
1300
1301    let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1302
1303    for edge in &workflow.edges {
1304        emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1305    }
1306
1307    for node in &workflow.nodes {
1308        if let Some(switch) = node.node_type.switch.as_ref() {
1309            for branch in &switch.branches {
1310                emitted.insert((
1311                    node.id.clone(),
1312                    branch.condition.clone(),
1313                    branch.target.clone(),
1314                ));
1315            }
1316            emitted.insert((
1317                node.id.clone(),
1318                "default".to_string(),
1319                switch.default.clone(),
1320            ));
1321        }
1322    }
1323
1324    let mut edges = emitted.into_iter().collect::<Vec<_>>();
1325    edges.sort();
1326
1327    for (from, label, to) in edges {
1328        let from_id = prefixed_mermaid_id(prefix, &from);
1329        let to_id = prefixed_mermaid_id(prefix, &to);
1330        if label.is_empty() {
1331            block.lines.push(format!("{} --> {}", from_id, to_id));
1332        } else {
1333            block.lines.push(format!(
1334                "{} -- \"{}\" --> {}",
1335                from_id,
1336                escape_mermaid_label(&label),
1337                to_id
1338            ));
1339        }
1340    }
1341
1342    block
1343}
1344
1345fn discover_referenced_subgraphs(
1346    workflow_path: &Path,
1347    workflow: &YamlWorkflow,
1348) -> Result<Vec<MermaidSubgraphWorkflow>, YamlWorkflowRunError> {
1349    let workflow_ids = referenced_workflow_ids(workflow);
1350    if workflow_ids.is_empty() {
1351        return Ok(Vec::new());
1352    }
1353
1354    let parent_dir = workflow_path.parent().unwrap_or(Path::new("."));
1355    let sibling_workflows = load_yaml_sibling_workflows(parent_dir, workflow_path)?;
1356
1357    let mut discovered = Vec::new();
1358    let mut seen = HashSet::new();
1359
1360    for workflow_id in workflow_ids {
1361        let normalized = normalize_workflow_lookup_key(&workflow_id);
1362        if seen.contains(&normalized) {
1363            continue;
1364        }
1365
1366        if let Some((_, subworkflow)) = sibling_workflows.iter().find(|(key, _)| key == &normalized)
1367        {
1368            discovered.push(MermaidSubgraphWorkflow {
1369                alias: workflow_id.clone(),
1370                workflow: subworkflow.clone(),
1371            });
1372            seen.insert(normalized);
1373        }
1374    }
1375
1376    Ok(discovered)
1377}
1378
1379fn load_yaml_sibling_workflows(
1380    parent_dir: &Path,
1381    workflow_path: &Path,
1382) -> Result<Vec<(String, YamlWorkflow)>, YamlWorkflowRunError> {
1383    let mut results = Vec::new();
1384    let entries = std::fs::read_dir(parent_dir).map_err(|source| YamlWorkflowRunError::Read {
1385        path: parent_dir.display().to_string(),
1386        source,
1387    })?;
1388
1389    for entry in entries {
1390        let entry = entry.map_err(|source| YamlWorkflowRunError::Read {
1391            path: parent_dir.display().to_string(),
1392            source,
1393        })?;
1394        let path = entry.path();
1395        if !is_yaml_file(&path) {
1396            continue;
1397        }
1398
1399        if path == workflow_path {
1400            continue;
1401        }
1402
1403        let contents =
1404            std::fs::read_to_string(&path).map_err(|source| YamlWorkflowRunError::Read {
1405                path: path.display().to_string(),
1406                source,
1407            })?;
1408        let subworkflow: YamlWorkflow =
1409            serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1410                path: path.display().to_string(),
1411                source,
1412            })?;
1413
1414        if let Some(stem) = path.file_stem().and_then(|value| value.to_str()) {
1415            results.push((normalize_workflow_lookup_key(stem), subworkflow.clone()));
1416        }
1417        results.push((normalize_workflow_lookup_key(&subworkflow.id), subworkflow));
1418    }
1419
1420    Ok(results)
1421}
1422
1423fn referenced_workflow_ids(workflow: &YamlWorkflow) -> Vec<String> {
1424    let mut ids = Vec::new();
1425    let mut seen = HashSet::new();
1426
1427    for node in &workflow.nodes {
1428        let prompt = node
1429            .config
1430            .as_ref()
1431            .and_then(|config| config.prompt.as_deref());
1432
1433        if let Some(llm) = node.node_type.llm_call.as_ref() {
1434            for tool in &llm.tools {
1435                if tool_declaration_name(tool) != "run_workflow_graph" {
1436                    continue;
1437                }
1438
1439                for workflow_id in referenced_workflow_ids_from_tool(tool) {
1440                    let normalized = normalize_workflow_lookup_key(&workflow_id);
1441                    if seen.insert(normalized) {
1442                        ids.push(workflow_id);
1443                    }
1444                }
1445
1446                if let Some(prompt_text) = prompt {
1447                    for workflow_id in referenced_workflow_ids_from_prompt(prompt_text) {
1448                        let normalized = normalize_workflow_lookup_key(&workflow_id);
1449                        if seen.insert(normalized) {
1450                            ids.push(workflow_id);
1451                        }
1452                    }
1453                }
1454            }
1455        }
1456    }
1457
1458    ids
1459}
1460
1461fn referenced_workflow_ids_from_tool(tool: &YamlToolDeclaration) -> Vec<String> {
1462    let schema = match tool {
1463        YamlToolDeclaration::OpenAi(openai) => openai.function.parameters.as_ref(),
1464        YamlToolDeclaration::Simplified(simple) => Some(&simple.input_schema),
1465    };
1466
1467    let mut ids = Vec::new();
1468    if let Some(schema) = schema {
1469        if let Some(workflow_prop) = schema
1470            .get("properties")
1471            .and_then(Value::as_object)
1472            .and_then(|properties| properties.get("workflow_id"))
1473        {
1474            if let Some(value) = workflow_prop.get("const").and_then(Value::as_str) {
1475                ids.push(value.to_string());
1476            }
1477
1478            if let Some(enum_values) = workflow_prop.get("enum").and_then(Value::as_array) {
1479                for value in enum_values {
1480                    if let Some(value) = value.as_str() {
1481                        ids.push(value.to_string());
1482                    }
1483                }
1484            }
1485        }
1486    }
1487
1488    ids
1489}
1490
1491fn referenced_workflow_ids_from_prompt(prompt: &str) -> Vec<String> {
1492    let mut ids = Vec::new();
1493    let mut search = prompt;
1494
1495    while let Some(index) = search.find("\"workflow_id\"") {
1496        let remainder = &search[index + "\"workflow_id\"".len()..];
1497        let Some(colon_index) = remainder.find(':') else {
1498            break;
1499        };
1500
1501        let candidate = remainder[colon_index + 1..].trim_start();
1502        if let Some(rest) = candidate.strip_prefix('"') {
1503            if let Some(end_quote_index) = rest.find('"') {
1504                let workflow_id = rest[..end_quote_index].trim();
1505                if !workflow_id.is_empty() {
1506                    ids.push(workflow_id.to_string());
1507                }
1508                search = &rest[end_quote_index + 1..];
1509                continue;
1510            }
1511        }
1512
1513        search = &remainder[colon_index + 1..];
1514    }
1515
1516    ids
1517}
1518
1519fn normalize_workflow_lookup_key(value: &str) -> String {
1520    value
1521        .chars()
1522        .map(|ch| match ch {
1523            '-' => '_',
1524            _ => ch.to_ascii_lowercase(),
1525        })
1526        .collect()
1527}
1528
1529fn is_yaml_file(path: &Path) -> bool {
1530    matches!(
1531        path.extension().and_then(|ext| ext.to_str()),
1532        Some("yaml") | Some("yml")
1533    )
1534}
1535
1536fn prefixed_mermaid_id(prefix: &str, id: &str) -> String {
1537    sanitize_mermaid_id(format!("{}__{}", prefix, id).as_str())
1538}
1539
1540fn tool_declaration_name(tool: &YamlToolDeclaration) -> &str {
1541    match tool {
1542        YamlToolDeclaration::OpenAi(openai) => &openai.function.name,
1543        YamlToolDeclaration::Simplified(simple) => &simple.name,
1544    }
1545}
1546
1547pub fn yaml_workflow_to_ir(workflow: &YamlWorkflow) -> Result<WorkflowDefinition, YamlToIrError> {
1548    let known_ids: HashSet<&str> = workflow.nodes.iter().map(|n| n.id.as_str()).collect();
1549    if !known_ids.contains(workflow.entry_node.as_str()) {
1550        return Err(YamlToIrError::MissingEntry {
1551            entry_node: workflow.entry_node.clone(),
1552        });
1553    }
1554
1555    let mut outgoing: HashMap<&str, Vec<&str>> = HashMap::new();
1556    for edge in &workflow.edges {
1557        outgoing
1558            .entry(edge.from.as_str())
1559            .or_default()
1560            .push(edge.to.as_str());
1561    }
1562
1563    let mut nodes = Vec::with_capacity(workflow.nodes.len() + 1);
1564    nodes.push(Node {
1565        id: YAML_START_NODE_ID.to_string(),
1566        kind: NodeKind::Start {
1567            next: workflow.entry_node.clone(),
1568        },
1569    });
1570
1571    for node in &workflow.nodes {
1572        if let Some(llm) = node.node_type.llm_call.as_ref() {
1573            if node
1574                .config
1575                .as_ref()
1576                .and_then(|c| c.set_globals.as_ref())
1577                .is_some()
1578                || node
1579                    .config
1580                    .as_ref()
1581                    .and_then(|c| c.update_globals.as_ref())
1582                    .is_some()
1583            {
1584                return Err(YamlToIrError::UnsupportedNode {
1585                    node_id: node.id.clone(),
1586                    reason: "set_globals/update_globals are not represented in canonical IR llm nodes yet"
1587                        .to_string(),
1588                });
1589            }
1590
1591            if !llm.tools.is_empty() {
1592                return Err(YamlToIrError::UnsupportedNode {
1593                    node_id: node.id.clone(),
1594                    reason: "llm_call.tools are not represented in canonical IR llm nodes yet"
1595                        .to_string(),
1596                });
1597            }
1598
1599            let next = single_next_for_node(&outgoing, &node.id)?;
1600            nodes.push(Node {
1601                id: node.id.clone(),
1602                kind: NodeKind::Tool {
1603                    tool: YAML_LLM_TOOL_ID.to_string(),
1604                    input: json!({
1605                        "node_id": node.id,
1606                        "model": llm.model,
1607                        "prompt_template": node
1608                            .config
1609                            .as_ref()
1610                            .and_then(|c| c.prompt.clone())
1611                            .unwrap_or_default(),
1612                        "stream": llm.stream.unwrap_or(false),
1613                        "stream_json_as_text": llm.stream_json_as_text.unwrap_or(false),
1614                        "heal": llm.heal.unwrap_or(false),
1615                        "messages_path": llm.messages_path,
1616                        "append_prompt_as_user": llm.append_prompt_as_user.unwrap_or(true),
1617                        "output_schema": node
1618                            .config
1619                            .as_ref()
1620                            .and_then(|c| c.output_schema.clone())
1621                            .unwrap_or_else(default_llm_output_schema),
1622                    }),
1623                    next,
1624                },
1625            });
1626            continue;
1627        }
1628
1629        if let Some(worker) = node.node_type.custom_worker.as_ref() {
1630            if node
1631                .config
1632                .as_ref()
1633                .and_then(|c| c.set_globals.as_ref())
1634                .is_some()
1635                || node
1636                    .config
1637                    .as_ref()
1638                    .and_then(|c| c.update_globals.as_ref())
1639                    .is_some()
1640            {
1641                return Err(YamlToIrError::UnsupportedNode {
1642                    node_id: node.id.clone(),
1643                    reason: "set_globals/update_globals are not represented in canonical IR tool nodes yet"
1644                        .to_string(),
1645                });
1646            }
1647
1648            let next = single_next_for_node(&outgoing, &node.id)?;
1649            nodes.push(Node {
1650                id: node.id.clone(),
1651                kind: NodeKind::Tool {
1652                    tool: worker.handler.clone(),
1653                    input: node
1654                        .config
1655                        .as_ref()
1656                        .and_then(|c| c.payload.clone())
1657                        .unwrap_or_else(|| json!({})),
1658                    next,
1659                },
1660            });
1661            continue;
1662        }
1663
1664        if let Some(switch) = node.node_type.switch.as_ref() {
1665            nodes.push(Node {
1666                id: node.id.clone(),
1667                kind: NodeKind::Router {
1668                    routes: switch
1669                        .branches
1670                        .iter()
1671                        .map(|b| RouterRoute {
1672                            when: rewrite_yaml_condition_to_ir(&b.condition),
1673                            next: b.target.clone(),
1674                        })
1675                        .collect(),
1676                    default: switch.default.clone(),
1677                },
1678            });
1679            continue;
1680        }
1681
1682        return Err(YamlToIrError::UnsupportedNode {
1683            node_id: node.id.clone(),
1684            reason: "node_type must be llm_call, switch, or custom_worker".to_string(),
1685        });
1686    }
1687
1688    Ok(WorkflowDefinition {
1689        version: WORKFLOW_IR_V0.to_string(),
1690        name: workflow.id.clone(),
1691        nodes,
1692    })
1693}
1694
1695fn single_next_for_node(
1696    outgoing: &HashMap<&str, Vec<&str>>,
1697    node_id: &str,
1698) -> Result<Option<String>, YamlToIrError> {
1699    match outgoing.get(node_id) {
1700        None => Ok(None),
1701        Some(targets) if targets.len() == 1 => Ok(Some(targets[0].to_string())),
1702        Some(_) => Err(YamlToIrError::MultipleOutgoingEdge {
1703            node_id: node_id.to_string(),
1704        }),
1705    }
1706}
1707
1708fn rewrite_yaml_condition_to_ir(expr: &str) -> String {
1709    let rewritten = expr
1710        .replace("$.nodes.", "$.node_outputs.")
1711        .replace(".output.", ".");
1712    if let Some(prefix) = rewritten.strip_suffix(".output") {
1713        prefix.to_string()
1714    } else {
1715        rewritten
1716    }
1717}
1718
1719fn sanitize_mermaid_id(id: &str) -> String {
1720    let mut out = String::with_capacity(id.len() + 1);
1721    if id
1722        .chars()
1723        .next()
1724        .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_')
1725    {
1726        out.push_str(id);
1727    } else {
1728        out.push('n');
1729        out.push('_');
1730        out.push_str(id);
1731    }
1732    out.chars()
1733        .map(|ch| {
1734            if ch.is_ascii_alphanumeric() || ch == '_' {
1735                ch
1736            } else {
1737                '_'
1738            }
1739        })
1740        .collect()
1741}
1742
1743fn escape_mermaid_label(label: &str) -> String {
1744    label.replace('"', "\\\"")
1745}
1746
1747#[derive(Debug, Clone)]
1748pub struct YamlLlmExecutionRequest {
1749    pub node_id: String,
1750    pub is_terminal_node: bool,
1751    pub stream_json_as_text: bool,
1752    pub model: String,
1753    pub messages: Option<Vec<Message>>,
1754    pub append_prompt_as_user: bool,
1755    pub prompt: String,
1756    pub prompt_template: String,
1757    pub prompt_bindings: Vec<YamlTemplateBinding>,
1758    pub schema: Value,
1759    pub stream: bool,
1760    pub heal: bool,
1761    pub tools: Vec<YamlResolvedTool>,
1762    pub tool_choice: Option<ToolChoice>,
1763    pub max_tool_roundtrips: u8,
1764    pub tool_calls_global_key: Option<String>,
1765    pub tool_trace_mode: YamlToolTraceMode,
1766    pub execution_context: Value,
1767    pub email_text: String,
1768    pub trace_id: Option<String>,
1769    pub trace_context: Option<TraceContext>,
1770    pub trace_sampled: bool,
1771}
1772
1773#[derive(Debug, Clone)]
1774pub struct YamlResolvedTool {
1775    pub definition: ToolDefinition,
1776    pub output_schema: Option<Value>,
1777}
1778
1779#[async_trait]
1780pub trait YamlWorkflowLlmExecutor: Send + Sync {
1781    async fn complete_structured(
1782        &self,
1783        request: YamlLlmExecutionRequest,
1784        event_sink: Option<&dyn YamlWorkflowEventSink>,
1785    ) -> Result<YamlLlmExecutionResult, String>;
1786}
1787
1788#[async_trait]
1789pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
1790    async fn execute(
1791        &self,
1792        handler: &str,
1793        payload: &Value,
1794        email_text: &str,
1795        context: &Value,
1796    ) -> Result<Value, String>;
1797}
1798
1799pub async fn run_workflow_yaml_file(
1800    workflow_path: &Path,
1801    workflow_input: &Value,
1802    executor: &dyn YamlWorkflowLlmExecutor,
1803) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1804    let contents =
1805        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1806            path: workflow_path.display().to_string(),
1807            source,
1808        })?;
1809
1810    let workflow: YamlWorkflow =
1811        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1812            path: workflow_path.display().to_string(),
1813            source,
1814        })?;
1815
1816    run_workflow_yaml(&workflow, workflow_input, executor).await
1817}
1818
1819pub async fn run_email_workflow_yaml_file(
1820    workflow_path: &Path,
1821    email_text: &str,
1822    executor: &dyn YamlWorkflowLlmExecutor,
1823) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1824    let workflow_input = json!({ "email_text": email_text });
1825    run_workflow_yaml_file(workflow_path, &workflow_input, executor).await
1826}
1827
1828pub async fn run_workflow_yaml_file_with_client(
1829    workflow_path: &Path,
1830    workflow_input: &Value,
1831    client: &SimpleAgentsClient,
1832) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1833    let contents =
1834        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1835            path: workflow_path.display().to_string(),
1836            source,
1837        })?;
1838
1839    let workflow: YamlWorkflow =
1840        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1841            path: workflow_path.display().to_string(),
1842            source,
1843        })?;
1844
1845    run_workflow_yaml_with_client(&workflow, workflow_input, client).await
1846}
1847
1848pub async fn run_email_workflow_yaml_file_with_client(
1849    workflow_path: &Path,
1850    email_text: &str,
1851    client: &SimpleAgentsClient,
1852) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1853    let workflow_input = json!({ "email_text": email_text });
1854    run_workflow_yaml_file_with_client(workflow_path, &workflow_input, client).await
1855}
1856
1857pub async fn run_workflow_yaml_with_client(
1858    workflow: &YamlWorkflow,
1859    workflow_input: &Value,
1860    client: &SimpleAgentsClient,
1861) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1862    run_workflow_yaml_with_client_and_custom_worker(workflow, workflow_input, client, None).await
1863}
1864
1865pub async fn run_email_workflow_yaml_with_client(
1866    workflow: &YamlWorkflow,
1867    email_text: &str,
1868    client: &SimpleAgentsClient,
1869) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1870    let workflow_input = json!({ "email_text": email_text });
1871    run_workflow_yaml_with_client(workflow, &workflow_input, client).await
1872}
1873
1874pub async fn run_workflow_yaml_file_with_client_and_custom_worker(
1875    workflow_path: &Path,
1876    workflow_input: &Value,
1877    client: &SimpleAgentsClient,
1878    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1879) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1880    let contents =
1881        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1882            path: workflow_path.display().to_string(),
1883            source,
1884        })?;
1885
1886    let workflow: YamlWorkflow =
1887        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1888            path: workflow_path.display().to_string(),
1889            source,
1890        })?;
1891
1892    run_workflow_yaml_with_client_and_custom_worker(
1893        &workflow,
1894        workflow_input,
1895        client,
1896        custom_worker,
1897    )
1898    .await
1899}
1900
1901pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker(
1902    workflow_path: &Path,
1903    email_text: &str,
1904    client: &SimpleAgentsClient,
1905    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1906) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1907    let workflow_input = json!({ "email_text": email_text });
1908    run_workflow_yaml_file_with_client_and_custom_worker(
1909        workflow_path,
1910        &workflow_input,
1911        client,
1912        custom_worker,
1913    )
1914    .await
1915}
1916
1917pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events(
1918    workflow_path: &Path,
1919    workflow_input: &Value,
1920    client: &SimpleAgentsClient,
1921    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1922    event_sink: Option<&dyn YamlWorkflowEventSink>,
1923) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1924    run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
1925        workflow_path,
1926        workflow_input,
1927        client,
1928        custom_worker,
1929        event_sink,
1930        &YamlWorkflowRunOptions::default(),
1931    )
1932    .await
1933}
1934
1935pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
1936    workflow_path: &Path,
1937    workflow_input: &Value,
1938    client: &SimpleAgentsClient,
1939    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1940    event_sink: Option<&dyn YamlWorkflowEventSink>,
1941    options: &YamlWorkflowRunOptions,
1942) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1943    let contents =
1944        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1945            path: workflow_path.display().to_string(),
1946            source,
1947        })?;
1948
1949    let workflow: YamlWorkflow =
1950        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1951            path: workflow_path.display().to_string(),
1952            source,
1953        })?;
1954
1955    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1956        &workflow,
1957        workflow_input,
1958        client,
1959        custom_worker,
1960        event_sink,
1961        options,
1962    )
1963    .await
1964}
1965
1966pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker_and_events(
1967    workflow_path: &Path,
1968    email_text: &str,
1969    client: &SimpleAgentsClient,
1970    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1971    event_sink: Option<&dyn YamlWorkflowEventSink>,
1972) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1973    let workflow_input = json!({ "email_text": email_text });
1974    run_workflow_yaml_file_with_client_and_custom_worker_and_events(
1975        workflow_path,
1976        &workflow_input,
1977        client,
1978        custom_worker,
1979        event_sink,
1980    )
1981    .await
1982}
1983
1984pub async fn run_workflow_yaml_with_client_and_custom_worker(
1985    workflow: &YamlWorkflow,
1986    workflow_input: &Value,
1987    client: &SimpleAgentsClient,
1988    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1989) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1990    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1991        workflow,
1992        workflow_input,
1993        client,
1994        custom_worker,
1995        None,
1996        &YamlWorkflowRunOptions::default(),
1997    )
1998    .await
1999}
2000
2001pub async fn run_email_workflow_yaml_with_client_and_custom_worker(
2002    workflow: &YamlWorkflow,
2003    email_text: &str,
2004    client: &SimpleAgentsClient,
2005    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2006) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2007    let workflow_input = json!({ "email_text": email_text });
2008    run_workflow_yaml_with_client_and_custom_worker(
2009        workflow,
2010        &workflow_input,
2011        client,
2012        custom_worker,
2013    )
2014    .await
2015}
2016
2017pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events(
2018    workflow: &YamlWorkflow,
2019    workflow_input: &Value,
2020    client: &SimpleAgentsClient,
2021    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2022    event_sink: Option<&dyn YamlWorkflowEventSink>,
2023) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2024    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2025        workflow,
2026        workflow_input,
2027        client,
2028        custom_worker,
2029        event_sink,
2030        &YamlWorkflowRunOptions::default(),
2031    )
2032    .await
2033}
2034
2035pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2036    workflow: &YamlWorkflow,
2037    workflow_input: &Value,
2038    client: &SimpleAgentsClient,
2039    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2040    event_sink: Option<&dyn YamlWorkflowEventSink>,
2041    options: &YamlWorkflowRunOptions,
2042) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2043    struct BorrowedClientExecutor<'a> {
2044        client: &'a SimpleAgentsClient,
2045        custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
2046        run_options: YamlWorkflowRunOptions,
2047    }
2048
2049    #[async_trait]
2050    impl<'a> YamlWorkflowLlmExecutor for BorrowedClientExecutor<'a> {
2051        async fn complete_structured(
2052            &self,
2053            request: YamlLlmExecutionRequest,
2054            event_sink: Option<&dyn YamlWorkflowEventSink>,
2055        ) -> Result<YamlLlmExecutionResult, String> {
2056            let expects_object = schema_expects_object(&request.schema);
2057            let messages = if let Some(mut history) = request.messages.clone() {
2058                if request.append_prompt_as_user && !request.prompt.trim().is_empty() {
2059                    history.push(Message::user(&request.prompt));
2060                }
2061                history
2062            } else {
2063                vec![
2064                    Message::system("You execute workflow classification steps."),
2065                    Message::user(&request.prompt),
2066                ]
2067            };
2068
2069            if !request.tools.is_empty() {
2070                let mut tool_traces: Vec<YamlToolCallTrace> = Vec::new();
2071                let mut conversation = messages;
2072                let mut usage_total: Option<YamlLlmTokenUsage> = None;
2073
2074                for roundtrip in 0..=request.max_tool_roundtrips {
2075                    let mut builder = CompletionRequest::builder()
2076                        .model(&request.model)
2077                        .messages(conversation.clone())
2078                        .tools(request.tools.iter().map(|t| t.definition.clone()).collect());
2079
2080                    if request.heal && expects_object {
2081                        builder = builder.json_schema("workflow_step", request.schema.clone());
2082                    }
2083
2084                    if let Some(choice) = request.tool_choice.clone() {
2085                        builder = builder.tool_choice(choice);
2086                    }
2087
2088                    if request.stream {
2089                        builder = builder.stream(true);
2090                    }
2091
2092                    let completion_request = builder
2093                        .build()
2094                        .map_err(|error| format!("failed to build completion request: {error}"))?;
2095
2096                    let outcome = self
2097                        .client
2098                        .complete(&completion_request, CompletionOptions::default())
2099                        .await
2100                        .map_err(|error| error.to_string())?;
2101
2102                    let mut streamed_tool_calls: Option<Vec<ToolCall>> = None;
2103                    let mut streamed_content = String::new();
2104                    let mut finish_reason = FinishReason::Stop;
2105
2106                    match outcome {
2107                        CompletionOutcome::Response(response) => {
2108                            if let Some(usage) = usage_total.as_mut() {
2109                                usage.prompt_tokens += response.usage.prompt_tokens;
2110                                usage.completion_tokens += response.usage.completion_tokens;
2111                                usage.total_tokens += response.usage.total_tokens;
2112                            } else {
2113                                usage_total = Some(YamlLlmTokenUsage {
2114                                    prompt_tokens: response.usage.prompt_tokens,
2115                                    completion_tokens: response.usage.completion_tokens,
2116                                    total_tokens: response.usage.total_tokens,
2117                                    thinking_tokens: None,
2118                                });
2119                            }
2120
2121                            let choice = response
2122                                .choices
2123                                .first()
2124                                .ok_or_else(|| "completion returned no choices".to_string())?;
2125                            streamed_content = choice.message.content.clone();
2126                            streamed_tool_calls = choice.message.tool_calls.clone();
2127                            finish_reason = choice.finish_reason;
2128                        }
2129                        CompletionOutcome::HealedJson(healed) => {
2130                            let response = healed.response;
2131                            if let Some(usage) = usage_total.as_mut() {
2132                                usage.prompt_tokens += response.usage.prompt_tokens;
2133                                usage.completion_tokens += response.usage.completion_tokens;
2134                                usage.total_tokens += response.usage.total_tokens;
2135                            } else {
2136                                usage_total = Some(YamlLlmTokenUsage {
2137                                    prompt_tokens: response.usage.prompt_tokens,
2138                                    completion_tokens: response.usage.completion_tokens,
2139                                    total_tokens: response.usage.total_tokens,
2140                                    thinking_tokens: None,
2141                                });
2142                            }
2143
2144                            let choice = response
2145                                .choices
2146                                .first()
2147                                .ok_or_else(|| "completion returned no choices".to_string())?;
2148                            streamed_content = choice.message.content.clone();
2149                            streamed_tool_calls = choice.message.tool_calls.clone();
2150                            finish_reason = choice.finish_reason;
2151                        }
2152                        CompletionOutcome::CoercedSchema(coerced) => {
2153                            let response = coerced.response;
2154                            if let Some(usage) = usage_total.as_mut() {
2155                                usage.prompt_tokens += response.usage.prompt_tokens;
2156                                usage.completion_tokens += response.usage.completion_tokens;
2157                                usage.total_tokens += response.usage.total_tokens;
2158                            } else {
2159                                usage_total = Some(YamlLlmTokenUsage {
2160                                    prompt_tokens: response.usage.prompt_tokens,
2161                                    completion_tokens: response.usage.completion_tokens,
2162                                    total_tokens: response.usage.total_tokens,
2163                                    thinking_tokens: None,
2164                                });
2165                            }
2166
2167                            let choice = response
2168                                .choices
2169                                .first()
2170                                .ok_or_else(|| "completion returned no choices".to_string())?;
2171                            streamed_content = choice.message.content.clone();
2172                            streamed_tool_calls = choice.message.tool_calls.clone();
2173                            finish_reason = choice.finish_reason;
2174                        }
2175                        CompletionOutcome::Stream(mut stream) => {
2176                            let mut final_stream_usage: Option<simple_agent_type::response::Usage> =
2177                                None;
2178                            let mut delta_filter = StructuredJsonDeltaFilter::default();
2179                            let include_raw_debug = include_raw_stream_debug_events();
2180                            let mut json_text_formatter = if request.stream_json_as_text {
2181                                Some(StreamJsonAsTextFormatter::default())
2182                            } else {
2183                                None
2184                            };
2185                            let mut tool_calls_by_index: HashMap<u32, ToolCall> = HashMap::new();
2186
2187                            while let Some(chunk_result) = stream.next().await {
2188                                if event_sink_is_cancelled(event_sink) {
2189                                    return Err(workflow_event_sink_cancelled_message().to_string());
2190                                }
2191
2192                                let chunk = chunk_result.map_err(|error| error.to_string())?;
2193                                if let Some(usage) = chunk.usage {
2194                                    final_stream_usage = Some(usage);
2195                                }
2196
2197                                if let Some(choice) = chunk.choices.first() {
2198                                    if let Some(chunk_finish_reason) = choice.finish_reason {
2199                                        finish_reason = chunk_finish_reason;
2200                                    }
2201
2202                                    if include_raw_debug {
2203                                        if let Some(reasoning_delta) =
2204                                            choice.delta.reasoning_content.as_ref()
2205                                        {
2206                                            if let Some(sink) = event_sink {
2207                                                sink.emit(&YamlWorkflowEvent {
2208                                                    event_type: "node_stream_thinking_delta"
2209                                                        .to_string(),
2210                                                    node_id: Some(request.node_id.clone()),
2211                                                    step_id: Some(request.node_id.clone()),
2212                                                    node_kind: Some("llm_call".to_string()),
2213                                                    streamable: Some(true),
2214                                                    message: None,
2215                                                    delta: Some(reasoning_delta.clone()),
2216                                                    token_kind: Some(
2217                                                        YamlWorkflowTokenKind::Thinking,
2218                                                    ),
2219                                                    is_terminal_node_token: Some(
2220                                                        request.is_terminal_node,
2221                                                    ),
2222                                                    elapsed_ms: None,
2223                                                    metadata: None,
2224                                                });
2225                                            }
2226                                        }
2227                                    }
2228
2229                                    if let Some(delta) = choice.delta.content.clone() {
2230                                        streamed_content.push_str(delta.as_str());
2231                                        let (output_delta, thinking_delta) = if expects_object {
2232                                            delta_filter.split(delta.as_str())
2233                                        } else {
2234                                            (Some(delta.clone()), None)
2235                                        };
2236                                        let rendered_output_delta = if let Some(output_chunk) =
2237                                            output_delta
2238                                        {
2239                                            if let Some(formatter) = json_text_formatter.as_mut() {
2240                                                formatter.push(output_chunk.as_str());
2241                                                formatter.emit_if_ready(delta_filter.completed())
2242                                            } else {
2243                                                Some(output_chunk)
2244                                            }
2245                                        } else {
2246                                            None
2247                                        };
2248
2249                                        if include_raw_debug {
2250                                            if let Some(sink) = event_sink {
2251                                                if let Some(raw_thinking_delta) =
2252                                                    thinking_delta.as_ref()
2253                                                {
2254                                                    sink.emit(&YamlWorkflowEvent {
2255                                                        event_type: "node_stream_thinking_delta"
2256                                                            .to_string(),
2257                                                        node_id: Some(request.node_id.clone()),
2258                                                        step_id: Some(request.node_id.clone()),
2259                                                        node_kind: Some("llm_call".to_string()),
2260                                                        streamable: Some(true),
2261                                                        message: None,
2262                                                        delta: Some(raw_thinking_delta.clone()),
2263                                                        token_kind: Some(
2264                                                            YamlWorkflowTokenKind::Thinking,
2265                                                        ),
2266                                                        is_terminal_node_token: Some(
2267                                                            request.is_terminal_node,
2268                                                        ),
2269                                                        elapsed_ms: None,
2270                                                        metadata: None,
2271                                                    });
2272                                                }
2273                                                if let Some(raw_output_delta) =
2274                                                    rendered_output_delta.as_ref()
2275                                                {
2276                                                    sink.emit(&YamlWorkflowEvent {
2277                                                        event_type: "node_stream_output_delta"
2278                                                            .to_string(),
2279                                                        node_id: Some(request.node_id.clone()),
2280                                                        step_id: Some(request.node_id.clone()),
2281                                                        node_kind: Some("llm_call".to_string()),
2282                                                        streamable: Some(true),
2283                                                        message: None,
2284                                                        delta: Some(raw_output_delta.clone()),
2285                                                        token_kind: Some(
2286                                                            YamlWorkflowTokenKind::Output,
2287                                                        ),
2288                                                        is_terminal_node_token: Some(
2289                                                            request.is_terminal_node,
2290                                                        ),
2291                                                        elapsed_ms: None,
2292                                                        metadata: None,
2293                                                    });
2294                                                }
2295                                            }
2296                                        }
2297
2298                                        if let Some(filtered_delta) = rendered_output_delta {
2299                                            if let Some(sink) = event_sink {
2300                                                sink.emit(&YamlWorkflowEvent {
2301                                                    event_type: "node_stream_delta".to_string(),
2302                                                    node_id: Some(request.node_id.clone()),
2303                                                    step_id: Some(request.node_id.clone()),
2304                                                    node_kind: Some("llm_call".to_string()),
2305                                                    streamable: Some(true),
2306                                                    message: None,
2307                                                    delta: Some(filtered_delta),
2308                                                    token_kind: Some(YamlWorkflowTokenKind::Output),
2309                                                    is_terminal_node_token: Some(
2310                                                        request.is_terminal_node,
2311                                                    ),
2312                                                    elapsed_ms: None,
2313                                                    metadata: None,
2314                                                });
2315                                            }
2316                                        }
2317                                    }
2318
2319                                    if let Some(tool_call_deltas) = choice.delta.tool_calls.as_ref()
2320                                    {
2321                                        for tool_call_delta in tool_call_deltas {
2322                                            let entry = tool_calls_by_index
2323                                                .entry(tool_call_delta.index)
2324                                                .or_insert_with(|| ToolCall {
2325                                                    id: tool_call_delta.id.clone().unwrap_or_else(
2326                                                        || {
2327                                                            format!(
2328                                                                "tool_call_{}",
2329                                                                tool_call_delta.index
2330                                                            )
2331                                                        },
2332                                                    ),
2333                                                    tool_type: ToolType::Function,
2334                                                    function:
2335                                                        simple_agent_type::tool::ToolCallFunction {
2336                                                            name: String::new(),
2337                                                            arguments: String::new(),
2338                                                        },
2339                                                });
2340
2341                                            if let Some(id) = tool_call_delta.id.as_ref() {
2342                                                entry.id = id.clone();
2343                                            }
2344                                            if let Some(tool_type) = tool_call_delta.tool_type {
2345                                                entry.tool_type = tool_type;
2346                                            }
2347                                            if let Some(function_delta) =
2348                                                tool_call_delta.function.as_ref()
2349                                            {
2350                                                if let Some(name) = function_delta.name.as_ref() {
2351                                                    entry.function.name = name.clone();
2352                                                }
2353                                                if let Some(arguments) =
2354                                                    function_delta.arguments.as_ref()
2355                                                {
2356                                                    entry.function.arguments.push_str(arguments);
2357                                                }
2358                                            }
2359                                        }
2360                                    }
2361                                }
2362
2363                                if event_sink_is_cancelled(event_sink) {
2364                                    return Err(workflow_event_sink_cancelled_message().to_string());
2365                                }
2366                            }
2367
2368                            if let Some(usage) = final_stream_usage {
2369                                if let Some(total) = usage_total.as_mut() {
2370                                    total.prompt_tokens += usage.prompt_tokens;
2371                                    total.completion_tokens += usage.completion_tokens;
2372                                    total.total_tokens += usage.total_tokens;
2373                                } else {
2374                                    usage_total = Some(YamlLlmTokenUsage {
2375                                        prompt_tokens: usage.prompt_tokens,
2376                                        completion_tokens: usage.completion_tokens,
2377                                        total_tokens: usage.total_tokens,
2378                                        thinking_tokens: None,
2379                                    });
2380                                }
2381                            }
2382
2383                            let mut ordered_tool_calls =
2384                                tool_calls_by_index.into_iter().collect::<Vec<_>>();
2385                            ordered_tool_calls.sort_by_key(|(index, _)| *index);
2386                            if !ordered_tool_calls.is_empty() {
2387                                streamed_tool_calls = Some(
2388                                    ordered_tool_calls
2389                                        .into_iter()
2390                                        .map(|(_, tool_call)| tool_call)
2391                                        .collect::<Vec<_>>(),
2392                                );
2393                            }
2394                        }
2395                    }
2396
2397                    let has_tool_calls = streamed_tool_calls
2398                        .as_ref()
2399                        .is_some_and(|calls| !calls.is_empty());
2400                    if finish_reason != FinishReason::ToolCalls && !has_tool_calls {
2401                        let payload = if expects_object {
2402                            parse_streamed_structured_payload(
2403                                streamed_content.as_str(),
2404                                request.heal,
2405                            )
2406                            .map_err(|error| {
2407                                format!("failed to parse structured completion JSON: {error}")
2408                            })?
2409                            .payload
2410                        } else {
2411                            Value::String(streamed_content.clone())
2412                        };
2413                        return Ok(YamlLlmExecutionResult {
2414                            payload,
2415                            usage: usage_total,
2416                            ttft_ms: None,
2417                            tool_calls: tool_traces,
2418                        });
2419                    }
2420
2421                    if roundtrip >= request.max_tool_roundtrips {
2422                        return Err(format!(
2423                            "tool call roundtrip limit reached for node '{}' (max={})",
2424                            request.node_id, request.max_tool_roundtrips
2425                        ));
2426                    }
2427
2428                    let tool_calls: Vec<ToolCall> = streamed_tool_calls.ok_or_else(|| {
2429                        "finish_reason=tool_calls but no tool calls found".to_string()
2430                    })?;
2431                    if tool_calls
2432                        .iter()
2433                        .any(|tool_call| tool_call.function.name.trim().is_empty())
2434                    {
2435                        return Err("streamed tool call missing function name".to_string());
2436                    }
2437
2438                    let assistant_tool_message =
2439                        Message::assistant(&streamed_content).with_tool_calls(tool_calls.clone());
2440                    conversation.push(assistant_tool_message);
2441
2442                    for tool_call in tool_calls {
2443                        let tool_call_id = tool_call.id.clone();
2444                        let tool_name = tool_call.function.name.clone();
2445                        let tool_started = Instant::now();
2446                        let arguments: Value = serde_json::from_str(&tool_call.function.arguments)
2447                            .map_err(|error| {
2448                                format!(
2449                                    "tool '{}' arguments must be valid JSON: {}",
2450                                    tool_name, error
2451                                )
2452                            })?;
2453                        let mut tool_span_context: Option<TraceContext> = None;
2454                        let mut tool_span = if request.trace_sampled {
2455                            let (span_context, mut span) = workflow_tracer().start_span(
2456                                "workflow.tool.execute",
2457                                SpanKind::Node,
2458                                request.trace_context.as_ref(),
2459                            );
2460                            tool_span_context = Some(span_context);
2461                            span.set_attribute(
2462                                "trace_id",
2463                                request.trace_id.as_deref().unwrap_or_default(),
2464                            );
2465                            span.set_attribute("node_id", request.node_id.as_str());
2466                            span.set_attribute("node_kind", "llm_call");
2467                            span.set_attribute("tool_name", tool_name.as_str());
2468                            span.set_attribute("tool_call_id", tool_call_id.as_str());
2469                            let args_for_span =
2470                                payload_for_tool_trace(request.tool_trace_mode, &arguments)
2471                                    .to_string();
2472                            span.set_attribute("tool_arguments", args_for_span.as_str());
2473                            Some(span)
2474                        } else {
2475                            None
2476                        };
2477
2478                        if request.tool_trace_mode != YamlToolTraceMode::Off {
2479                            if let Some(sink) = event_sink {
2480                                sink.emit(&YamlWorkflowEvent {
2481                                    event_type: "node_tool_call_requested".to_string(),
2482                                    node_id: Some(request.node_id.clone()),
2483                                    step_id: Some(request.node_id.clone()),
2484                                    node_kind: Some("llm_call".to_string()),
2485                                    streamable: Some(false),
2486                                    message: Some(format!(
2487                                        "tool call requested: {}",
2488                                        tool_name
2489                                    )),
2490                                    delta: None,
2491                                    token_kind: None,
2492                                    is_terminal_node_token: None,
2493                                    elapsed_ms: None,
2494                                    metadata: Some(json!({
2495                                        "tool_call_id": tool_call_id.clone(),
2496                                        "tool_name": tool_name.clone(),
2497                                        "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
2498                                    })),
2499                                });
2500                            }
2501                        }
2502
2503                        let Some(tool_config) = request
2504                            .tools
2505                            .iter()
2506                            .find(|tool| tool.definition.function.name == tool_name)
2507                        else {
2508                            return Err(format!("model requested unknown tool '{}'", tool_name));
2509                        };
2510
2511                        let tool_output_result = if tool_name == "run_workflow_graph" {
2512                            execute_subworkflow_tool_call(
2513                                &arguments,
2514                                &request.execution_context,
2515                                self.client,
2516                                self.custom_worker,
2517                                &self.run_options,
2518                                tool_span_context.as_ref(),
2519                                request.trace_id.as_deref(),
2520                            )
2521                            .await
2522                        } else if let Some(custom_worker) = self.custom_worker {
2523                            custom_worker
2524                                .execute(
2525                                    tool_name.as_str(),
2526                                    &arguments,
2527                                    request.email_text.as_str(),
2528                                    &request.execution_context,
2529                                )
2530                                .await
2531                        } else {
2532                            Err(format!(
2533                                "tool '{}' requested but no custom worker executor is configured",
2534                                tool_name
2535                            ))
2536                        };
2537
2538                        let tool_output = match tool_output_result {
2539                            Ok(output) => output,
2540                            Err(message) => {
2541                                let elapsed_ms = tool_started.elapsed().as_millis();
2542                                if let Some(span) = tool_span.as_mut() {
2543                                    span.add_event("workflow.tool.execute.error");
2544                                    span.set_attribute("tool_status", "error");
2545                                    span.set_attribute("tool_error", message.as_str());
2546                                    span.set_attribute(
2547                                        "elapsed_ms",
2548                                        elapsed_ms.to_string().as_str(),
2549                                    );
2550                                }
2551                                if request.tool_trace_mode != YamlToolTraceMode::Off {
2552                                    if let Some(sink) = event_sink {
2553                                        sink.emit(&YamlWorkflowEvent {
2554                                            event_type: "node_tool_call_failed".to_string(),
2555                                            node_id: Some(request.node_id.clone()),
2556                                            step_id: Some(request.node_id.clone()),
2557                                            node_kind: Some("llm_call".to_string()),
2558                                            streamable: Some(false),
2559                                            message: Some(message.clone()),
2560                                            delta: None,
2561                                            token_kind: None,
2562                                            is_terminal_node_token: None,
2563                                            elapsed_ms: Some(elapsed_ms),
2564                                            metadata: Some(json!({
2565                                                "tool_call_id": tool_call_id.clone(),
2566                                                "tool_name": tool_name.clone(),
2567                                            })),
2568                                        });
2569                                    }
2570                                }
2571                                tool_traces.push(YamlToolCallTrace {
2572                                    id: tool_call_id.clone(),
2573                                    name: tool_name.clone(),
2574                                    arguments,
2575                                    output: None,
2576                                    status: "error".to_string(),
2577                                    elapsed_ms,
2578                                    error: Some(message.clone()),
2579                                });
2580                                if let Some(span) = tool_span.take() {
2581                                    span.end();
2582                                }
2583                                return Err(format!("tool '{}' failed: {}", tool_name, message));
2584                            }
2585                        };
2586
2587                        if let Some(output_schema) = tool_config.output_schema.as_ref() {
2588                            validate_schema_instance(output_schema, &tool_output).map_err(
2589                                |message| {
2590                                    format!(
2591                                        "tool '{}' output failed schema validation: {}",
2592                                        tool_name, message
2593                                    )
2594                                },
2595                            )?;
2596                        }
2597
2598                        let elapsed_ms = tool_started.elapsed().as_millis();
2599                        if let Some(span) = tool_span.as_mut() {
2600                            span.add_event("workflow.tool.execute.completed");
2601                            span.set_attribute("tool_status", "ok");
2602                            span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
2603                            let output_for_span =
2604                                payload_for_tool_trace(request.tool_trace_mode, &tool_output)
2605                                    .to_string();
2606                            span.set_attribute("tool_output", output_for_span.as_str());
2607                        }
2608                        if request.tool_trace_mode != YamlToolTraceMode::Off {
2609                            if let Some(sink) = event_sink {
2610                                sink.emit(&YamlWorkflowEvent {
2611                                    event_type: "node_tool_call_completed".to_string(),
2612                                    node_id: Some(request.node_id.clone()),
2613                                    step_id: Some(request.node_id.clone()),
2614                                    node_kind: Some("llm_call".to_string()),
2615                                    streamable: Some(false),
2616                                    message: Some(format!(
2617                                        "tool call completed: {}",
2618                                        tool_name
2619                                    )),
2620                                    delta: None,
2621                                    token_kind: None,
2622                                    is_terminal_node_token: None,
2623                                    elapsed_ms: Some(elapsed_ms),
2624                                    metadata: Some(json!({
2625                                        "tool_call_id": tool_call_id.clone(),
2626                                        "tool_name": tool_name.clone(),
2627                                        "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
2628                                        "output": payload_for_tool_trace(request.tool_trace_mode, &tool_output),
2629                                    })),
2630                                });
2631                            }
2632                        }
2633
2634                        tool_traces.push(YamlToolCallTrace {
2635                            id: tool_call_id.clone(),
2636                            name: tool_name.clone(),
2637                            arguments: arguments.clone(),
2638                            output: Some(tool_output.clone()),
2639                            status: "ok".to_string(),
2640                            elapsed_ms,
2641                            error: None,
2642                        });
2643
2644                        conversation.push(Message::tool(
2645                            serde_json::to_string(&tool_output).map_err(|error| {
2646                                format!("failed to serialize tool output: {error}")
2647                            })?,
2648                            tool_call_id,
2649                        ));
2650                        if let Some(span) = tool_span.take() {
2651                            span.end();
2652                        }
2653                    }
2654
2655                    if request.tool_trace_mode != YamlToolTraceMode::Off {
2656                        if let Some(sink) = event_sink {
2657                            sink.emit(&YamlWorkflowEvent {
2658                                event_type: "node_tool_roundtrip_completed".to_string(),
2659                                node_id: Some(request.node_id.clone()),
2660                                step_id: Some(request.node_id.clone()),
2661                                node_kind: Some("llm_call".to_string()),
2662                                streamable: Some(false),
2663                                message: Some(format!(
2664                                    "tool roundtrip {} completed",
2665                                    roundtrip + 1
2666                                )),
2667                                delta: None,
2668                                token_kind: None,
2669                                is_terminal_node_token: None,
2670                                elapsed_ms: None,
2671                                metadata: Some(json!({
2672                                    "roundtrip": roundtrip + 1,
2673                                    "max_tool_roundtrips": request.max_tool_roundtrips,
2674                                })),
2675                            });
2676                        }
2677                    }
2678                }
2679
2680                return Err(format!(
2681                    "tool-enabled llm_call '{}' exhausted loop without final payload",
2682                    request.node_id
2683                ));
2684            }
2685
2686            let mut builder = CompletionRequest::builder()
2687                .model(&request.model)
2688                .messages(messages);
2689
2690            if request.heal && !request.stream && expects_object {
2691                builder = builder.json_schema("workflow_step", request.schema.clone());
2692            }
2693
2694            if request.stream {
2695                builder = builder.stream(true);
2696            }
2697
2698            let completion_request = builder
2699                .build()
2700                .map_err(|error| format!("failed to build completion request: {error}"))?;
2701
2702            let completion_options = if request.heal && !request.stream && expects_object {
2703                CompletionOptions {
2704                    mode: CompletionMode::HealedJson,
2705                }
2706            } else {
2707                CompletionOptions::default()
2708            };
2709
2710            let outcome = self
2711                .client
2712                .complete(&completion_request, completion_options)
2713                .await
2714                .map_err(|error| error.to_string())?;
2715
2716            match outcome {
2717                CompletionOutcome::Stream(mut stream) => {
2718                    let mut aggregated = String::new();
2719                    let mut final_stream_usage: Option<simple_agent_type::response::Usage> = None;
2720                    let stream_started = Instant::now();
2721                    let mut ttft_ms: Option<u128> = None;
2722                    let mut delta_filter = StructuredJsonDeltaFilter::default();
2723                    let include_raw_debug = include_raw_stream_debug_events();
2724                    let mut json_text_formatter = if request.stream_json_as_text {
2725                        Some(StreamJsonAsTextFormatter::default())
2726                    } else {
2727                        None
2728                    };
2729                    while let Some(chunk_result) = stream.next().await {
2730                        if event_sink_is_cancelled(event_sink) {
2731                            return Err(workflow_event_sink_cancelled_message().to_string());
2732                        }
2733                        let chunk = chunk_result.map_err(|error| error.to_string())?;
2734                        if let Some(usage) = chunk.usage {
2735                            final_stream_usage = Some(usage);
2736                        }
2737                        if let Some(choice) = chunk.choices.first() {
2738                            if ttft_ms.is_none()
2739                                && (choice
2740                                    .delta
2741                                    .content
2742                                    .as_ref()
2743                                    .is_some_and(|delta| !delta.is_empty())
2744                                    || choice
2745                                        .delta
2746                                        .reasoning_content
2747                                        .as_ref()
2748                                        .is_some_and(|delta| !delta.is_empty()))
2749                            {
2750                                ttft_ms = Some(stream_started.elapsed().as_millis());
2751                            }
2752                            if include_raw_debug {
2753                                if let Some(reasoning_delta) =
2754                                    choice.delta.reasoning_content.as_ref()
2755                                {
2756                                    if let Some(sink) = event_sink {
2757                                        sink.emit(&YamlWorkflowEvent {
2758                                            event_type: "node_stream_thinking_delta".to_string(),
2759                                            node_id: Some(request.node_id.clone()),
2760                                            step_id: Some(request.node_id.clone()),
2761                                            node_kind: Some("llm_call".to_string()),
2762                                            streamable: Some(true),
2763                                            message: None,
2764                                            delta: Some(reasoning_delta.clone()),
2765                                            token_kind: Some(YamlWorkflowTokenKind::Thinking),
2766                                            is_terminal_node_token: Some(request.is_terminal_node),
2767                                            elapsed_ms: None,
2768                                            metadata: None,
2769                                        });
2770                                    }
2771                                }
2772                            }
2773                            if let Some(delta) = choice.delta.content.clone() {
2774                                aggregated.push_str(delta.as_str());
2775                                let (output_delta, thinking_delta) = if expects_object {
2776                                    delta_filter.split(delta.as_str())
2777                                } else {
2778                                    (Some(delta.clone()), None)
2779                                };
2780                                let rendered_output_delta = if let Some(output_chunk) = output_delta
2781                                {
2782                                    if let Some(formatter) = json_text_formatter.as_mut() {
2783                                        formatter.push(output_chunk.as_str());
2784                                        formatter.emit_if_ready(delta_filter.completed())
2785                                    } else {
2786                                        Some(output_chunk)
2787                                    }
2788                                } else {
2789                                    None
2790                                };
2791                                if include_raw_debug {
2792                                    if let Some(sink) = event_sink {
2793                                        if let Some(raw_thinking_delta) = thinking_delta.as_ref() {
2794                                            sink.emit(&YamlWorkflowEvent {
2795                                                event_type: "node_stream_thinking_delta"
2796                                                    .to_string(),
2797                                                node_id: Some(request.node_id.clone()),
2798                                                step_id: Some(request.node_id.clone()),
2799                                                node_kind: Some("llm_call".to_string()),
2800                                                streamable: Some(true),
2801                                                message: None,
2802                                                delta: Some(raw_thinking_delta.clone()),
2803                                                token_kind: Some(YamlWorkflowTokenKind::Thinking),
2804                                                is_terminal_node_token: Some(
2805                                                    request.is_terminal_node,
2806                                                ),
2807                                                elapsed_ms: None,
2808                                                metadata: None,
2809                                            });
2810                                        }
2811                                        if let Some(raw_output_delta) =
2812                                            rendered_output_delta.as_ref()
2813                                        {
2814                                            sink.emit(&YamlWorkflowEvent {
2815                                                event_type: "node_stream_output_delta".to_string(),
2816                                                node_id: Some(request.node_id.clone()),
2817                                                step_id: Some(request.node_id.clone()),
2818                                                node_kind: Some("llm_call".to_string()),
2819                                                streamable: Some(true),
2820                                                message: None,
2821                                                delta: Some(raw_output_delta.clone()),
2822                                                token_kind: Some(YamlWorkflowTokenKind::Output),
2823                                                is_terminal_node_token: Some(
2824                                                    request.is_terminal_node,
2825                                                ),
2826                                                elapsed_ms: None,
2827                                                metadata: None,
2828                                            });
2829                                        }
2830                                    }
2831                                }
2832                                if let Some(filtered_delta) = rendered_output_delta {
2833                                    if let Some(sink) = event_sink {
2834                                        sink.emit(&YamlWorkflowEvent {
2835                                            event_type: "node_stream_delta".to_string(),
2836                                            node_id: Some(request.node_id.clone()),
2837                                            step_id: Some(request.node_id.clone()),
2838                                            node_kind: Some("llm_call".to_string()),
2839                                            streamable: Some(true),
2840                                            message: None,
2841                                            delta: Some(filtered_delta),
2842                                            token_kind: Some(YamlWorkflowTokenKind::Output),
2843                                            is_terminal_node_token: Some(request.is_terminal_node),
2844                                            elapsed_ms: None,
2845                                            metadata: None,
2846                                        });
2847                                    }
2848                                }
2849                            }
2850                        }
2851
2852                        if event_sink_is_cancelled(event_sink) {
2853                            return Err(workflow_event_sink_cancelled_message().to_string());
2854                        }
2855                    }
2856
2857                    let payload = if expects_object {
2858                        let resolved =
2859                            parse_streamed_structured_payload(aggregated.as_str(), request.heal)?;
2860                        if let Some(confidence) = resolved.heal_confidence {
2861                            if let Some(sink) = event_sink {
2862                                sink.emit(&YamlWorkflowEvent {
2863                                    event_type: "node_healed".to_string(),
2864                                    node_id: Some(request.node_id.clone()),
2865                                    step_id: Some(request.node_id.clone()),
2866                                    node_kind: Some("llm_call".to_string()),
2867                                    streamable: Some(true),
2868                                    message: Some(format!(
2869                                        "healed streamed structured response confidence={confidence}"
2870                                    )),
2871                                    delta: None,
2872                                    token_kind: None,
2873                                    is_terminal_node_token: None,
2874                                    elapsed_ms: None,
2875                                    metadata: None,
2876                                });
2877                            }
2878                        }
2879                        resolved.payload
2880                    } else {
2881                        Value::String(aggregated)
2882                    };
2883
2884                    Ok(YamlLlmExecutionResult {
2885                        payload,
2886                        usage: final_stream_usage.map(|usage| YamlLlmTokenUsage {
2887                            prompt_tokens: usage.prompt_tokens,
2888                            completion_tokens: usage.completion_tokens,
2889                            total_tokens: usage.total_tokens,
2890                            thinking_tokens: None,
2891                        }),
2892                        ttft_ms,
2893                        tool_calls: Vec::new(),
2894                    })
2895                }
2896                CompletionOutcome::Response(response) => {
2897                    let payload = if expects_object {
2898                        let content = response
2899                            .content()
2900                            .ok_or_else(|| "completion returned empty content".to_string())?;
2901                        serde_json::from_str(content).map_err(|error| {
2902                            format!("failed to parse structured completion JSON: {error}")
2903                        })?
2904                    } else {
2905                        Value::String(response.content().unwrap_or_default().to_string())
2906                    };
2907
2908                    Ok(YamlLlmExecutionResult {
2909                        payload,
2910                        usage: Some(YamlLlmTokenUsage {
2911                            prompt_tokens: response.usage.prompt_tokens,
2912                            completion_tokens: response.usage.completion_tokens,
2913                            total_tokens: response.usage.total_tokens,
2914                            thinking_tokens: None,
2915                        }),
2916                        ttft_ms: None,
2917                        tool_calls: Vec::new(),
2918                    })
2919                }
2920                CompletionOutcome::HealedJson(healed) => {
2921                    if !expects_object {
2922                        return Err(
2923                            "healed json outcome is unsupported for non-object schema".to_string()
2924                        );
2925                    }
2926                    if let Some(sink) = event_sink {
2927                        sink.emit(&YamlWorkflowEvent {
2928                            event_type: "node_healed".to_string(),
2929                            node_id: Some(request.node_id.clone()),
2930                            step_id: Some(request.node_id.clone()),
2931                            node_kind: Some("llm_call".to_string()),
2932                            streamable: Some(request.stream),
2933                            message: Some(format!(
2934                                "healed structured response confidence={}",
2935                                healed.parsed.confidence
2936                            )),
2937                            delta: None,
2938                            token_kind: None,
2939                            is_terminal_node_token: None,
2940                            elapsed_ms: None,
2941                            metadata: None,
2942                        });
2943                    }
2944                    Ok(YamlLlmExecutionResult {
2945                        payload: healed.parsed.value,
2946                        usage: Some(YamlLlmTokenUsage {
2947                            prompt_tokens: healed.response.usage.prompt_tokens,
2948                            completion_tokens: healed.response.usage.completion_tokens,
2949                            total_tokens: healed.response.usage.total_tokens,
2950                            thinking_tokens: None,
2951                        }),
2952                        ttft_ms: None,
2953                        tool_calls: Vec::new(),
2954                    })
2955                }
2956                CompletionOutcome::CoercedSchema(coerced) => {
2957                    if !expects_object {
2958                        return Err(
2959                            "coerced schema outcome is unsupported for non-object schema"
2960                                .to_string(),
2961                        );
2962                    }
2963                    Ok(YamlLlmExecutionResult {
2964                        payload: coerced.coerced.value,
2965                        usage: Some(YamlLlmTokenUsage {
2966                            prompt_tokens: coerced.response.usage.prompt_tokens,
2967                            completion_tokens: coerced.response.usage.completion_tokens,
2968                            total_tokens: coerced.response.usage.total_tokens,
2969                            thinking_tokens: None,
2970                        }),
2971                        ttft_ms: None,
2972                        tool_calls: Vec::new(),
2973                    })
2974                }
2975            }
2976        }
2977    }
2978
2979    let executor = BorrowedClientExecutor {
2980        client,
2981        custom_worker,
2982        run_options: options.clone(),
2983    };
2984    run_workflow_yaml_with_custom_worker_and_events_and_options(
2985        workflow,
2986        workflow_input,
2987        &executor,
2988        custom_worker,
2989        event_sink,
2990        options,
2991    )
2992    .await
2993}
2994
2995pub async fn run_email_workflow_yaml_with_client_and_custom_worker_and_events(
2996    workflow: &YamlWorkflow,
2997    email_text: &str,
2998    client: &SimpleAgentsClient,
2999    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3000    event_sink: Option<&dyn YamlWorkflowEventSink>,
3001) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3002    let workflow_input = json!({ "email_text": email_text });
3003    run_workflow_yaml_with_client_and_custom_worker_and_events(
3004        workflow,
3005        &workflow_input,
3006        client,
3007        custom_worker,
3008        event_sink,
3009    )
3010    .await
3011}
3012
3013pub async fn run_workflow_yaml(
3014    workflow: &YamlWorkflow,
3015    workflow_input: &Value,
3016    executor: &dyn YamlWorkflowLlmExecutor,
3017) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3018    run_workflow_yaml_with_custom_worker_and_events(workflow, workflow_input, executor, None, None)
3019        .await
3020}
3021
3022pub async fn run_email_workflow_yaml(
3023    workflow: &YamlWorkflow,
3024    email_text: &str,
3025    executor: &dyn YamlWorkflowLlmExecutor,
3026) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3027    let workflow_input = json!({ "email_text": email_text });
3028    run_workflow_yaml(workflow, &workflow_input, executor).await
3029}
3030
3031pub async fn run_workflow_yaml_with_custom_worker(
3032    workflow: &YamlWorkflow,
3033    workflow_input: &Value,
3034    executor: &dyn YamlWorkflowLlmExecutor,
3035    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3036) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3037    run_workflow_yaml_with_custom_worker_and_events(
3038        workflow,
3039        workflow_input,
3040        executor,
3041        custom_worker,
3042        None,
3043    )
3044    .await
3045}
3046
3047pub async fn run_email_workflow_yaml_with_custom_worker(
3048    workflow: &YamlWorkflow,
3049    email_text: &str,
3050    executor: &dyn YamlWorkflowLlmExecutor,
3051    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3052) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3053    let workflow_input = json!({ "email_text": email_text });
3054    run_workflow_yaml_with_custom_worker(workflow, &workflow_input, executor, custom_worker).await
3055}
3056
3057pub async fn run_workflow_yaml_with_custom_worker_and_events(
3058    workflow: &YamlWorkflow,
3059    workflow_input: &Value,
3060    executor: &dyn YamlWorkflowLlmExecutor,
3061    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3062    event_sink: Option<&dyn YamlWorkflowEventSink>,
3063) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3064    run_workflow_yaml_with_custom_worker_and_events_and_options(
3065        workflow,
3066        workflow_input,
3067        executor,
3068        custom_worker,
3069        event_sink,
3070        &YamlWorkflowRunOptions::default(),
3071    )
3072    .await
3073}
3074
3075pub async fn run_workflow_yaml_with_custom_worker_and_events_and_options(
3076    workflow: &YamlWorkflow,
3077    workflow_input: &Value,
3078    executor: &dyn YamlWorkflowLlmExecutor,
3079    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3080    event_sink: Option<&dyn YamlWorkflowEventSink>,
3081    options: &YamlWorkflowRunOptions,
3082) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3083    if !workflow_input.is_object() {
3084        return Err(YamlWorkflowRunError::InvalidInput {
3085            message: "workflow input must be a JSON object".to_string(),
3086        });
3087    }
3088
3089    validate_sample_rate(options.telemetry.sample_rate)?;
3090
3091    let email_text = workflow_input
3092        .get("email_text")
3093        .and_then(Value::as_str)
3094        .unwrap_or_default();
3095
3096    let diagnostics = verify_yaml_workflow(workflow);
3097    let errors: Vec<YamlWorkflowDiagnostic> = diagnostics
3098        .iter()
3099        .filter(|d| d.severity == YamlWorkflowDiagnosticSeverity::Error)
3100        .cloned()
3101        .collect();
3102    if !errors.is_empty() {
3103        return Err(YamlWorkflowRunError::Validation {
3104            diagnostics_count: errors.len(),
3105            diagnostics: errors,
3106        });
3107    }
3108
3109    if let Some(output) =
3110        try_run_yaml_via_ir_runtime(workflow, workflow_input, executor, custom_worker, options)
3111            .await?
3112    {
3113        return Ok(output);
3114    }
3115
3116    let parent_trace_context = trace_context_from_options(options);
3117    let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
3118
3119    let tracer = workflow_tracer();
3120    let mut workflow_span_context: Option<TraceContext> = None;
3121    let mut workflow_span = if telemetry_context.sampled {
3122        let (span_context, mut span) = tracer.start_span(
3123            "workflow.run",
3124            SpanKind::Workflow,
3125            parent_trace_context.as_ref(),
3126        );
3127        if let Some(trace_id_value) = telemetry_context.trace_id.as_deref() {
3128            span.set_attribute("trace_id", trace_id_value);
3129        }
3130        apply_trace_tenant_attributes(span.as_mut(), options);
3131        workflow_span_context = Some(span_context);
3132        Some(span)
3133    } else {
3134        None
3135    };
3136
3137    if workflow.nodes.is_empty() {
3138        return Err(YamlWorkflowRunError::EmptyNodes {
3139            workflow_id: workflow.id.clone(),
3140        });
3141    }
3142
3143    let node_map: HashMap<&str, &YamlNode> = workflow
3144        .nodes
3145        .iter()
3146        .map(|node| (node.id.as_str(), node))
3147        .collect();
3148    if !node_map.contains_key(workflow.entry_node.as_str()) {
3149        return Err(YamlWorkflowRunError::MissingEntry {
3150            entry_node: workflow.entry_node.clone(),
3151        });
3152    }
3153
3154    let edge_map: HashMap<&str, &str> = workflow
3155        .edges
3156        .iter()
3157        .map(|edge| (edge.from.as_str(), edge.to.as_str()))
3158        .collect();
3159
3160    let mut current = workflow.entry_node.clone();
3161    let mut trace = Vec::new();
3162    let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
3163    let mut globals = serde_json::Map::new();
3164    let mut step_timings = Vec::new();
3165    let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
3166    let mut llm_node_models: BTreeMap<String, String> = BTreeMap::new();
3167    let mut token_totals = YamlTokenTotals::default();
3168    let mut workflow_ttft_ms: Option<u128> = None;
3169    let started = Instant::now();
3170
3171    if let Some(sink) = event_sink {
3172        sink.emit(&YamlWorkflowEvent {
3173            event_type: "workflow_started".to_string(),
3174            node_id: None,
3175            step_id: None,
3176            node_kind: None,
3177            streamable: None,
3178            message: Some(format!("workflow_id={}", workflow.id)),
3179            delta: None,
3180            token_kind: None,
3181            is_terminal_node_token: None,
3182            elapsed_ms: Some(0),
3183            metadata: None,
3184        });
3185    }
3186
3187    if event_sink_is_cancelled(event_sink) {
3188        return Err(YamlWorkflowRunError::EventSinkCancelled {
3189            message: workflow_event_sink_cancelled_message().to_string(),
3190        });
3191    }
3192
3193    loop {
3194        if event_sink_is_cancelled(event_sink) {
3195            return Err(YamlWorkflowRunError::EventSinkCancelled {
3196                message: workflow_event_sink_cancelled_message().to_string(),
3197            });
3198        }
3199
3200        let node =
3201            *node_map
3202                .get(current.as_str())
3203                .ok_or_else(|| YamlWorkflowRunError::MissingNode {
3204                    node_id: current.clone(),
3205                })?;
3206
3207        trace.push(node.id.clone());
3208        let step_started = Instant::now();
3209
3210        let mut node_span_context: Option<TraceContext> = None;
3211        let mut node_span = if telemetry_context.sampled {
3212            let (span_context, mut span) = tracer.start_span(
3213                "workflow.node.execute",
3214                SpanKind::Node,
3215                workflow_span_context.as_ref(),
3216            );
3217            node_span_context = Some(span_context);
3218            span.set_attribute(
3219                "trace_id",
3220                telemetry_context.trace_id.as_deref().unwrap_or_default(),
3221            );
3222            span.set_attribute("node_id", node.id.as_str());
3223            span.set_attribute("node_kind", node.kind_name());
3224            Some(span)
3225        } else {
3226            None
3227        };
3228
3229        let node_streamable = node
3230            .node_type
3231            .llm_call
3232            .as_ref()
3233            .map(|llm| llm.stream.unwrap_or(false) && !llm.heal.unwrap_or(false));
3234        let workflow_elapsed_before_node_ms = started.elapsed().as_millis();
3235
3236        if let Some(sink) = event_sink {
3237            sink.emit(&YamlWorkflowEvent {
3238                event_type: "node_started".to_string(),
3239                node_id: Some(node.id.clone()),
3240                step_id: Some(node.id.clone()),
3241                node_kind: Some(node.kind_name().to_string()),
3242                streamable: node_streamable,
3243                message: if node_streamable == Some(false) {
3244                    Some("Node is not streamable; status events only".to_string())
3245                } else {
3246                    None
3247                },
3248                delta: None,
3249                token_kind: None,
3250                is_terminal_node_token: None,
3251                elapsed_ms: Some(workflow_elapsed_before_node_ms),
3252                metadata: None,
3253            });
3254        }
3255
3256        if event_sink_is_cancelled(event_sink) {
3257            return Err(YamlWorkflowRunError::EventSinkCancelled {
3258                message: workflow_event_sink_cancelled_message().to_string(),
3259            });
3260        }
3261
3262        let mut node_usage: Option<YamlLlmTokenUsage> = None;
3263        let is_terminal_node = !edge_map.contains_key(node.id.as_str());
3264        let next = if let Some(llm) = &node.node_type.llm_call {
3265            let prompt_template = node
3266                .config
3267                .as_ref()
3268                .and_then(|cfg| cfg.prompt.as_deref())
3269                .unwrap_or_default();
3270            let context = json!({
3271                "input": workflow_input,
3272                "nodes": outputs,
3273                "globals": Value::Object(globals.clone())
3274            });
3275            let messages = if let Some(path) = llm.messages_path.as_deref() {
3276                Some(
3277                    parse_messages_from_context(path, &context).map_err(|message| {
3278                        YamlWorkflowRunError::Llm {
3279                            node_id: node.id.clone(),
3280                            message,
3281                        }
3282                    })?,
3283                )
3284            } else {
3285                None
3286            };
3287            let prompt_bindings = collect_template_bindings(prompt_template, &context);
3288            let prompt = interpolate_template(prompt_template, &context);
3289            let schema = llm_output_schema_for_node(node);
3290
3291            let request = YamlLlmExecutionRequest {
3292                node_id: node.id.clone(),
3293                is_terminal_node,
3294                stream_json_as_text: llm.stream_json_as_text.unwrap_or(false),
3295                model: resolve_requested_model(options.model.as_deref(), &llm.model),
3296                messages,
3297                append_prompt_as_user: llm.append_prompt_as_user.unwrap_or(true),
3298                prompt,
3299                prompt_template: prompt_template.to_string(),
3300                prompt_bindings,
3301                schema,
3302                stream: llm.stream.unwrap_or(false),
3303                heal: llm.heal.unwrap_or(false),
3304                tools: normalize_llm_tools(llm).map_err(|message| YamlWorkflowRunError::Llm {
3305                    node_id: node.id.clone(),
3306                    message,
3307                })?,
3308                tool_choice: normalize_tool_choice(llm.tool_choice.clone()).map_err(|message| {
3309                    YamlWorkflowRunError::Llm {
3310                        node_id: node.id.clone(),
3311                        message,
3312                    }
3313                })?,
3314                max_tool_roundtrips: llm.max_tool_roundtrips.unwrap_or(1),
3315                tool_calls_global_key: llm.tool_calls_global_key.clone(),
3316                tool_trace_mode: options.telemetry.tool_trace_mode,
3317                execution_context: context.clone(),
3318                email_text: email_text.to_string(),
3319                trace_id: telemetry_context.trace_id.clone(),
3320                trace_context: node_span_context.clone(),
3321                trace_sampled: telemetry_context.sampled,
3322            };
3323
3324            if let Some(span) = node_span.as_mut() {
3325                span.set_attribute(
3326                    "node_input",
3327                    payload_for_span(options.telemetry.payload_mode, &context).as_str(),
3328                );
3329            }
3330
3331            if let Some(sink) = event_sink {
3332                sink.emit(&YamlWorkflowEvent {
3333                    event_type: "node_llm_input_resolved".to_string(),
3334                    node_id: Some(node.id.clone()),
3335                    step_id: Some(node.id.clone()),
3336                    node_kind: Some("llm_call".to_string()),
3337                    streamable: Some(request.stream),
3338                    message: Some("resolved llm input for telemetry".to_string()),
3339                    delta: None,
3340                    token_kind: None,
3341                    is_terminal_node_token: None,
3342                    elapsed_ms: Some(started.elapsed().as_millis()),
3343                    metadata: Some(json!({
3344                        "model": request.model.clone(),
3345                        "stream_requested": request.stream,
3346                        "stream_json_as_text": request.stream_json_as_text,
3347                        "heal_requested": request.heal,
3348                        "effective_stream": request.stream,
3349                        "prompt_template": request.prompt_template.clone(),
3350                        "prompt": request.prompt.clone(),
3351                        "schema": request.schema.clone(),
3352                        "bindings": request.prompt_bindings.clone(),
3353                        "tools_count": request.tools.len(),
3354                        "max_tool_roundtrips": request.max_tool_roundtrips,
3355                    })),
3356                });
3357            }
3358
3359            llm_node_models.insert(node.id.clone(), request.model.clone());
3360
3361            if event_sink_is_cancelled(event_sink) {
3362                return Err(YamlWorkflowRunError::EventSinkCancelled {
3363                    message: workflow_event_sink_cancelled_message().to_string(),
3364                });
3365            }
3366
3367            let llm_result = executor
3368                .complete_structured(request, event_sink)
3369                .await
3370                .map_err(|message| YamlWorkflowRunError::Llm {
3371                    node_id: node.id.clone(),
3372                    message,
3373                })?;
3374
3375            if let Some(usage) = llm_result.usage.as_ref() {
3376                token_totals.add_usage(usage);
3377            }
3378            if workflow_ttft_ms.is_none() {
3379                workflow_ttft_ms = llm_result
3380                    .ttft_ms
3381                    .map(|node_ttft_ms| workflow_elapsed_before_node_ms + node_ttft_ms);
3382            }
3383            node_usage = llm_result.usage;
3384
3385            let payload = llm_result.payload;
3386            let tool_calls = llm_result.tool_calls;
3387
3388            let mut node_output = json!({ "output": payload });
3389            if !tool_calls.is_empty() {
3390                if let Some(output_obj) = node_output.as_object_mut() {
3391                    output_obj.insert("tool_calls".to_string(), json!(tool_calls));
3392                }
3393            }
3394            outputs.insert(node.id.clone(), node_output);
3395            if let Some(span) = node_span.as_mut() {
3396                if let Some(output_payload) = outputs.get(node.id.as_str()) {
3397                    span.set_attribute(
3398                        "node_output",
3399                        payload_for_span(options.telemetry.payload_mode, output_payload).as_str(),
3400                    );
3401                }
3402            }
3403            apply_set_globals(node, &outputs, workflow_input, &mut globals);
3404            apply_update_globals(node, &outputs, workflow_input, &mut globals);
3405            if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
3406                if let Some(node_tool_calls) = outputs
3407                    .get(node.id.as_str())
3408                    .and_then(|value| value.get("tool_calls"))
3409                    .cloned()
3410                {
3411                    globals.insert(global_key.clone(), node_tool_calls);
3412                }
3413            }
3414            edge_map
3415                .get(node.id.as_str())
3416                .map(|value| value.to_string())
3417        } else if let Some(switch) = &node.node_type.switch {
3418            let context = json!({
3419                "input": workflow_input,
3420                "nodes": outputs,
3421                "globals": Value::Object(globals.clone())
3422            });
3423            let mut chosen = Some(switch.default.clone());
3424            for branch in &switch.branches {
3425                if evaluate_switch_condition(branch.condition.as_str(), &context)? {
3426                    chosen = Some(branch.target.clone());
3427                    break;
3428                }
3429            }
3430            let chosen = chosen.ok_or_else(|| YamlWorkflowRunError::InvalidSwitchTarget {
3431                node_id: node.id.clone(),
3432            })?;
3433            Some(chosen)
3434        } else if let Some(custom) = &node.node_type.custom_worker {
3435            let payload = node
3436                .config
3437                .as_ref()
3438                .and_then(|cfg| cfg.payload.as_ref())
3439                .cloned()
3440                .unwrap_or_else(|| json!({}));
3441            let context = json!({
3442                "input": workflow_input,
3443                "nodes": outputs,
3444                "globals": Value::Object(globals.clone())
3445            });
3446
3447            if let Some(span) = node_span.as_mut() {
3448                span.set_attribute("handler_name", custom.handler.as_str());
3449                span.set_attribute(
3450                    "node_input",
3451                    payload_for_span(options.telemetry.payload_mode, &payload).as_str(),
3452                );
3453            }
3454
3455            let mut handler_span_context: Option<TraceContext> = None;
3456            let mut handler_span = if telemetry_context.sampled {
3457                let (span_context, mut span) = tracer.start_span(
3458                    "handler.invoke",
3459                    SpanKind::Node,
3460                    workflow_span_context.as_ref(),
3461                );
3462                handler_span_context = Some(span_context);
3463                span.set_attribute(
3464                    "trace_id",
3465                    telemetry_context.trace_id.as_deref().unwrap_or_default(),
3466                );
3467                span.set_attribute("handler_name", custom.handler.as_str());
3468                apply_trace_tenant_attributes(span.as_mut(), options);
3469                Some(span)
3470            } else {
3471                None
3472            };
3473
3474            let worker_trace_context = merged_trace_context_for_worker(
3475                handler_span_context.as_ref(),
3476                telemetry_context.trace_id.as_deref(),
3477                options,
3478            );
3479            let worker_context = custom_worker_context_with_trace(
3480                &context,
3481                &worker_trace_context,
3482                &options.trace.tenant,
3483            );
3484
3485            let worker_output_result = if let Some(custom_worker_executor) = custom_worker {
3486                custom_worker_executor
3487                    .execute(
3488                        custom.handler.as_str(),
3489                        &payload,
3490                        email_text,
3491                        &worker_context,
3492                    )
3493                    .await
3494                    .map_err(|message| YamlWorkflowRunError::CustomWorker {
3495                        node_id: node.id.clone(),
3496                        message,
3497                    })
3498            } else {
3499                mock_custom_worker_output(custom.handler.as_str(), &payload)
3500            };
3501
3502            if let Some(span) = handler_span.take() {
3503                span.end();
3504            }
3505
3506            let worker_output = worker_output_result?;
3507
3508            outputs.insert(node.id.clone(), json!({ "output": worker_output }));
3509            if let Some(span) = node_span.as_mut() {
3510                if let Some(output_payload) = outputs.get(node.id.as_str()) {
3511                    span.set_attribute(
3512                        "node_output",
3513                        payload_for_span(options.telemetry.payload_mode, output_payload).as_str(),
3514                    );
3515                }
3516            }
3517            apply_set_globals(node, &outputs, workflow_input, &mut globals);
3518            apply_update_globals(node, &outputs, workflow_input, &mut globals);
3519            edge_map
3520                .get(node.id.as_str())
3521                .map(|value| value.to_string())
3522        } else {
3523            return Err(YamlWorkflowRunError::UnsupportedNodeType {
3524                node_id: node.id.clone(),
3525            });
3526        };
3527
3528        let node_kind = node.kind_name().to_string();
3529        let elapsed_ms = step_started.elapsed().as_millis();
3530        step_timings.push(YamlStepTiming {
3531            node_id: node.id.clone(),
3532            node_kind,
3533            elapsed_ms,
3534            prompt_tokens: node_usage.as_ref().map(|usage| usage.prompt_tokens),
3535            completion_tokens: node_usage.as_ref().map(|usage| usage.completion_tokens),
3536            total_tokens: node_usage.as_ref().map(|usage| usage.total_tokens),
3537            thinking_tokens: node_usage.as_ref().and_then(|usage| usage.thinking_tokens),
3538            tokens_per_second: node_usage
3539                .as_ref()
3540                .map(|usage| completion_tokens_per_second(usage.completion_tokens, elapsed_ms)),
3541        });
3542
3543        if let Some(usage) = node_usage.as_ref() {
3544            llm_node_metrics.insert(
3545                node.id.clone(),
3546                YamlLlmNodeMetrics {
3547                    elapsed_ms,
3548                    prompt_tokens: usage.prompt_tokens,
3549                    completion_tokens: usage.completion_tokens,
3550                    total_tokens: usage.total_tokens,
3551                    thinking_tokens: usage.thinking_tokens,
3552                    tokens_per_second: completion_tokens_per_second(
3553                        usage.completion_tokens,
3554                        elapsed_ms,
3555                    ),
3556                },
3557            );
3558        }
3559
3560        if let Some(mut span) = node_span.take() {
3561            span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
3562            span.add_event("node_completed");
3563            span.end();
3564        }
3565
3566        if let Some(sink) = event_sink {
3567            sink.emit(&YamlWorkflowEvent {
3568                event_type: "node_completed".to_string(),
3569                node_id: Some(node.id.clone()),
3570                step_id: Some(node.id.clone()),
3571                node_kind: Some(node.kind_name().to_string()),
3572                streamable: node_streamable,
3573                message: None,
3574                delta: None,
3575                token_kind: None,
3576                is_terminal_node_token: None,
3577                elapsed_ms: Some(elapsed_ms),
3578                metadata: None,
3579            });
3580        }
3581
3582        if event_sink_is_cancelled(event_sink) {
3583            return Err(YamlWorkflowRunError::EventSinkCancelled {
3584                message: workflow_event_sink_cancelled_message().to_string(),
3585            });
3586        }
3587
3588        if let Some(next) = next {
3589            current = next;
3590            continue;
3591        }
3592        break;
3593    }
3594
3595    let terminal_node = trace
3596        .last()
3597        .cloned()
3598        .ok_or_else(|| YamlWorkflowRunError::EmptyNodes {
3599            workflow_id: workflow.id.clone(),
3600        })?;
3601
3602    let terminal_output = outputs
3603        .get(terminal_node.as_str())
3604        .and_then(|value| value.get("output"))
3605        .cloned();
3606
3607    let total_elapsed_ms = started.elapsed().as_millis();
3608    let output = YamlWorkflowRunOutput {
3609        workflow_id: workflow.id.clone(),
3610        entry_node: workflow.entry_node.clone(),
3611        email_text: email_text.to_string(),
3612        trace,
3613        outputs,
3614        terminal_node,
3615        terminal_output,
3616        step_timings,
3617        llm_node_metrics,
3618        llm_node_models,
3619        total_elapsed_ms,
3620        ttft_ms: workflow_ttft_ms,
3621        total_input_tokens: token_totals.input_tokens,
3622        total_output_tokens: token_totals.output_tokens,
3623        total_tokens: token_totals.total_tokens,
3624        total_thinking_tokens: token_totals.thinking_tokens,
3625        tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
3626        trace_id: telemetry_context.trace_id.clone(),
3627        metadata: telemetry_context.trace_id.as_ref().map(|value| {
3628            workflow_metadata_with_trace(
3629                options,
3630                value,
3631                telemetry_context.sampled,
3632                telemetry_context.trace_id_source,
3633            )
3634        }),
3635    };
3636
3637    if let Some(sink) = event_sink {
3638        let event_metadata = if options.telemetry.nerdstats {
3639            Some(json!({
3640                "nerdstats": workflow_nerdstats(&output),
3641            }))
3642        } else {
3643            None
3644        };
3645        sink.emit(&YamlWorkflowEvent {
3646            event_type: "workflow_completed".to_string(),
3647            node_id: None,
3648            step_id: None,
3649            node_kind: None,
3650            streamable: None,
3651            message: Some(format!("terminal_node={}", output.terminal_node)),
3652            delta: None,
3653            token_kind: None,
3654            is_terminal_node_token: None,
3655            elapsed_ms: Some(output.total_elapsed_ms),
3656            metadata: event_metadata,
3657        });
3658    }
3659
3660    if event_sink_is_cancelled(event_sink) {
3661        return Err(YamlWorkflowRunError::EventSinkCancelled {
3662            message: workflow_event_sink_cancelled_message().to_string(),
3663        });
3664    }
3665
3666    if let Some(mut span) = workflow_span.take() {
3667        span.set_attribute("workflow_id", workflow.id.as_str());
3668        if let Some(trace_id_value) = telemetry_context.trace_id.as_ref() {
3669            span.set_attribute("trace_id", trace_id_value.as_str());
3670        }
3671        span.end();
3672        flush_workflow_tracer();
3673    }
3674
3675    Ok(output)
3676}
3677
3678async fn try_run_yaml_via_ir_runtime(
3679    workflow: &YamlWorkflow,
3680    workflow_input: &Value,
3681    executor: &dyn YamlWorkflowLlmExecutor,
3682    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3683    options: &YamlWorkflowRunOptions,
3684) -> Result<Option<YamlWorkflowRunOutput>, YamlWorkflowRunError> {
3685    let ir = match yaml_workflow_to_ir(workflow) {
3686        Ok(def) => def,
3687        Err(YamlToIrError::UnsupportedNode { .. })
3688        | Err(YamlToIrError::MultipleOutgoingEdge { .. }) => return Ok(None),
3689        Err(err) => {
3690            return Err(YamlWorkflowRunError::InvalidInput {
3691                message: err.to_string(),
3692            });
3693        }
3694    };
3695
3696    let parent_trace_context = trace_context_from_options(options);
3697    let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
3698
3699    let tracer = workflow_tracer();
3700    let mut workflow_span_context: Option<TraceContext> = None;
3701    let mut workflow_span = if telemetry_context.sampled {
3702        let (span_context, mut span) = tracer.start_span(
3703            "workflow.run",
3704            SpanKind::Workflow,
3705            parent_trace_context.as_ref(),
3706        );
3707        if let Some(trace_id_value) = telemetry_context.trace_id.as_deref() {
3708            span.set_attribute("trace_id", trace_id_value);
3709        }
3710        apply_trace_tenant_attributes(span.as_mut(), options);
3711        workflow_span_context = Some(span_context);
3712        Some(span)
3713    } else {
3714        None
3715    };
3716
3717    struct NoopLlm;
3718    #[async_trait]
3719    impl LlmExecutor for NoopLlm {
3720        async fn execute(
3721            &self,
3722            _input: LlmExecutionInput,
3723        ) -> Result<LlmExecutionOutput, LlmExecutionError> {
3724            Err(LlmExecutionError::UnexpectedOutcome(
3725                "yaml_ir_uses_tool_path",
3726            ))
3727        }
3728    }
3729
3730    struct YamlIrToolExecutor<'a> {
3731        llm_executor: &'a dyn YamlWorkflowLlmExecutor,
3732        custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
3733        token_totals: std::sync::Mutex<YamlTokenTotals>,
3734        node_usage: std::sync::Mutex<BTreeMap<String, YamlLlmTokenUsage>>,
3735        node_models: std::sync::Mutex<BTreeMap<String, String>>,
3736        model_override: Option<String>,
3737        trace_id: Option<String>,
3738        trace_context: Option<TraceContext>,
3739        trace_input_context: Option<YamlWorkflowTraceContextInput>,
3740        tenant_context: YamlWorkflowTraceTenantContext,
3741        payload_mode: YamlWorkflowPayloadMode,
3742        trace_sampled: bool,
3743    }
3744
3745    #[async_trait]
3746    impl ToolExecutor for YamlIrToolExecutor<'_> {
3747        async fn execute_tool(
3748            &self,
3749            input: ToolExecutionInput,
3750        ) -> Result<Value, ToolExecutionError> {
3751            let context = build_yaml_context_from_ir_scope(&input.scoped_input);
3752
3753            if input.tool == YAML_LLM_TOOL_ID {
3754                let node_id = input
3755                    .input
3756                    .get("node_id")
3757                    .and_then(Value::as_str)
3758                    .ok_or_else(|| {
3759                        ToolExecutionError::Failed("yaml llm call missing node_id".to_string())
3760                    })?
3761                    .to_string();
3762                let node_id_for_metrics = node_id.clone();
3763                let model = input
3764                    .input
3765                    .get("model")
3766                    .and_then(Value::as_str)
3767                    .ok_or_else(|| {
3768                        ToolExecutionError::Failed("yaml llm call missing model".to_string())
3769                    })?
3770                    .to_string();
3771                let resolved_model =
3772                    resolve_requested_model(self.model_override.as_deref(), &model);
3773                let prompt_template = input
3774                    .input
3775                    .get("prompt_template")
3776                    .and_then(Value::as_str)
3777                    .unwrap_or_default()
3778                    .to_string();
3779                let stream = input
3780                    .input
3781                    .get("stream")
3782                    .and_then(Value::as_bool)
3783                    .unwrap_or(false);
3784                let heal = input
3785                    .input
3786                    .get("heal")
3787                    .and_then(Value::as_bool)
3788                    .unwrap_or(false);
3789                let append_prompt_as_user = input
3790                    .input
3791                    .get("append_prompt_as_user")
3792                    .and_then(Value::as_bool)
3793                    .unwrap_or(true);
3794                let messages_path = input
3795                    .input
3796                    .get("messages_path")
3797                    .and_then(Value::as_str)
3798                    .map(str::to_string);
3799
3800                let messages = if let Some(path) = messages_path.as_deref() {
3801                    Some(
3802                        parse_messages_from_context(path, &context)
3803                            .map_err(ToolExecutionError::Failed)?,
3804                    )
3805                } else {
3806                    None
3807                };
3808
3809                let prompt_bindings = collect_template_bindings(&prompt_template, &context);
3810                let prompt = interpolate_template(&prompt_template, &context);
3811                let email_text = context
3812                    .get("input")
3813                    .and_then(|v| v.get("email_text"))
3814                    .and_then(Value::as_str)
3815                    .unwrap_or_default();
3816                let schema = input
3817                    .input
3818                    .get("output_schema")
3819                    .cloned()
3820                    .unwrap_or_else(default_llm_output_schema);
3821
3822                let request = YamlLlmExecutionRequest {
3823                    node_id,
3824                    is_terminal_node: false,
3825                    stream_json_as_text: input
3826                        .input
3827                        .get("stream_json_as_text")
3828                        .and_then(Value::as_bool)
3829                        .unwrap_or(false),
3830                    model: resolved_model.clone(),
3831                    messages,
3832                    append_prompt_as_user,
3833                    prompt,
3834                    prompt_template,
3835                    prompt_bindings,
3836                    schema,
3837                    stream,
3838                    heal,
3839                    tools: Vec::new(),
3840                    tool_choice: None,
3841                    max_tool_roundtrips: 1,
3842                    tool_calls_global_key: None,
3843                    tool_trace_mode: YamlToolTraceMode::Off,
3844                    execution_context: context.clone(),
3845                    email_text: email_text.to_string(),
3846                    trace_id: self.trace_id.clone(),
3847                    trace_context: self.trace_context.clone(),
3848                    trace_sampled: self.trace_sampled,
3849                };
3850
3851                let llm_result = self
3852                    .llm_executor
3853                    .complete_structured(request, None)
3854                    .await
3855                    .map_err(ToolExecutionError::Failed);
3856
3857                if let Ok(ref result) = llm_result {
3858                    if let Some(usage) = result.usage.as_ref() {
3859                        if let Ok(mut totals) = self.token_totals.lock() {
3860                            totals.add_usage(usage);
3861                        }
3862                        if let Ok(mut usage_map) = self.node_usage.lock() {
3863                            usage_map.insert(node_id_for_metrics.clone(), usage.clone());
3864                        }
3865                    }
3866                    if let Ok(mut model_map) = self.node_models.lock() {
3867                        model_map.insert(node_id_for_metrics, resolved_model);
3868                    }
3869                }
3870
3871                return llm_result.map(|result| result.payload);
3872            }
3873
3874            let worker = self
3875                .custom_worker
3876                .ok_or_else(|| ToolExecutionError::NotFound {
3877                    tool: input.tool.clone(),
3878                })?;
3879
3880            let payload = input.input.clone();
3881            let email_text = context
3882                .get("input")
3883                .and_then(|v| v.get("email_text"))
3884                .and_then(Value::as_str)
3885                .unwrap_or_default();
3886
3887            let tracer = workflow_tracer();
3888            let mut handler_span_context: Option<TraceContext> = None;
3889            let mut handler_span = if self.trace_sampled {
3890                let (span_context, mut span) = tracer.start_span(
3891                    "handler.invoke",
3892                    SpanKind::Node,
3893                    self.trace_context.as_ref(),
3894                );
3895                handler_span_context = Some(span_context);
3896                if let Some(trace_id) = self.trace_id.as_ref() {
3897                    span.set_attribute("trace_id", trace_id.as_str());
3898                }
3899                span.set_attribute("handler_name", input.tool.as_str());
3900                if let Some(workspace_id) = self.tenant_context.workspace_id.as_deref() {
3901                    span.set_attribute("tenant.workspace_id", workspace_id);
3902                }
3903                if let Some(user_id) = self.tenant_context.user_id.as_deref() {
3904                    span.set_attribute("tenant.user_id", user_id);
3905                }
3906                if let Some(conversation_id) = self.tenant_context.conversation_id.as_deref() {
3907                    span.set_attribute("tenant.conversation_id", conversation_id);
3908                }
3909                if let Some(request_id) = self.tenant_context.request_id.as_deref() {
3910                    span.set_attribute("tenant.request_id", request_id);
3911                }
3912                if let Some(run_id) = self.tenant_context.run_id.as_deref() {
3913                    span.set_attribute("tenant.run_id", run_id);
3914                }
3915                span.set_attribute(
3916                    "node_input",
3917                    payload_for_span(self.payload_mode, &payload).as_str(),
3918                );
3919                Some(span)
3920            } else {
3921                None
3922            };
3923
3924            let trace_options = YamlWorkflowRunOptions {
3925                telemetry: YamlWorkflowTelemetryConfig::default(),
3926                trace: YamlWorkflowTraceOptions {
3927                    context: self.trace_input_context.clone(),
3928                    tenant: self.tenant_context.clone(),
3929                },
3930                model: None,
3931            };
3932            let worker_trace_context = merged_trace_context_for_worker(
3933                handler_span_context.as_ref(),
3934                self.trace_id.as_deref(),
3935                &trace_options,
3936            );
3937            let worker_context = custom_worker_context_with_trace(
3938                &context,
3939                &worker_trace_context,
3940                &self.tenant_context,
3941            );
3942
3943            let output_result = worker
3944                .execute(&input.tool, &payload, email_text, &worker_context)
3945                .await
3946                .map_err(ToolExecutionError::Failed);
3947
3948            if let Some(span) = handler_span.as_mut() {
3949                if output_result.is_ok() {
3950                    span.add_event("handler.success");
3951                } else {
3952                    span.add_event("handler.error");
3953                }
3954            }
3955
3956            if let Some(span) = handler_span.take() {
3957                span.end();
3958            }
3959
3960            output_result
3961        }
3962    }
3963
3964    let tool_executor = YamlIrToolExecutor {
3965        llm_executor: executor,
3966        custom_worker,
3967        token_totals: std::sync::Mutex::new(YamlTokenTotals::default()),
3968        node_usage: std::sync::Mutex::new(BTreeMap::new()),
3969        node_models: std::sync::Mutex::new(BTreeMap::new()),
3970        model_override: options.model.clone(),
3971        trace_id: telemetry_context.trace_id.clone(),
3972        trace_context: workflow_span_context.clone(),
3973        trace_input_context: options.trace.context.clone(),
3974        tenant_context: options.trace.tenant.clone(),
3975        payload_mode: options.telemetry.payload_mode,
3976        trace_sampled: telemetry_context.sampled,
3977    };
3978
3979    let runtime = WorkflowRuntime::new(
3980        ir,
3981        &NoopLlm,
3982        Some(&tool_executor),
3983        WorkflowRuntimeOptions::default(),
3984    );
3985
3986    let started = Instant::now();
3987    let result = match runtime.execute(workflow_input.clone(), None).await {
3988        Ok(result) => result,
3989        Err(WorkflowRuntimeError::Validation(_)) => return Ok(None),
3990        Err(error) => {
3991            return Err(YamlWorkflowRunError::IrRuntime {
3992                message: error.to_string(),
3993            });
3994        }
3995    };
3996    let total_elapsed_ms = started.elapsed().as_millis();
3997
3998    let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
3999    for (node_id, output) in result.node_outputs {
4000        if node_id == YAML_START_NODE_ID {
4001            continue;
4002        }
4003        outputs.insert(node_id, json!({"output": output}));
4004    }
4005
4006    let mut trace = Vec::new();
4007    let mut step_timings = Vec::new();
4008    let node_usage_map = tool_executor
4009        .node_usage
4010        .lock()
4011        .map(|usage| usage.clone())
4012        .unwrap_or_default();
4013    let llm_node_models = tool_executor
4014        .node_models
4015        .lock()
4016        .map(|models| models.clone())
4017        .unwrap_or_default();
4018    let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
4019    for execution in result.node_executions {
4020        if execution.node_id == YAML_START_NODE_ID {
4021            continue;
4022        }
4023        trace.push(execution.node_id.clone());
4024        let usage = node_usage_map.get(&execution.node_id);
4025        if let Some(usage) = usage {
4026            llm_node_metrics.insert(
4027                execution.node_id.clone(),
4028                YamlLlmNodeMetrics {
4029                    elapsed_ms: 0,
4030                    prompt_tokens: usage.prompt_tokens,
4031                    completion_tokens: usage.completion_tokens,
4032                    total_tokens: usage.total_tokens,
4033                    thinking_tokens: usage.thinking_tokens,
4034                    tokens_per_second: completion_tokens_per_second(usage.completion_tokens, 0),
4035                },
4036            );
4037        }
4038        step_timings.push(YamlStepTiming {
4039            node_id: execution.node_id,
4040            node_kind: "ir_runtime".to_string(),
4041            elapsed_ms: 0,
4042            prompt_tokens: usage.map(|value| value.prompt_tokens),
4043            completion_tokens: usage.map(|value| value.completion_tokens),
4044            total_tokens: usage.map(|value| value.total_tokens),
4045            thinking_tokens: usage.and_then(|value| value.thinking_tokens),
4046            tokens_per_second: usage
4047                .map(|value| completion_tokens_per_second(value.completion_tokens, 0)),
4048        });
4049    }
4050
4051    let terminal_node = result.terminal_node_id;
4052    let terminal_output = outputs
4053        .get(&terminal_node)
4054        .and_then(|v| v.get("output"))
4055        .cloned();
4056
4057    let email_text = workflow_input
4058        .get("email_text")
4059        .and_then(Value::as_str)
4060        .unwrap_or_default()
4061        .to_string();
4062
4063    let token_totals = tool_executor
4064        .token_totals
4065        .lock()
4066        .map(|totals| totals.clone())
4067        .unwrap_or_default();
4068
4069    if let Some(mut span) = workflow_span.take() {
4070        span.set_attribute("workflow_id", workflow.id.as_str());
4071        if let Some(trace_id_value) = telemetry_context.trace_id.as_ref() {
4072            span.set_attribute("trace_id", trace_id_value.as_str());
4073        }
4074        span.end();
4075        flush_workflow_tracer();
4076    }
4077
4078    Ok(Some(YamlWorkflowRunOutput {
4079        workflow_id: workflow.id.clone(),
4080        entry_node: workflow.entry_node.clone(),
4081        email_text,
4082        trace,
4083        outputs,
4084        terminal_node,
4085        terminal_output,
4086        step_timings,
4087        llm_node_metrics,
4088        llm_node_models,
4089        total_elapsed_ms,
4090        ttft_ms: None,
4091        total_input_tokens: token_totals.input_tokens,
4092        total_output_tokens: token_totals.output_tokens,
4093        total_tokens: token_totals.total_tokens,
4094        total_thinking_tokens: token_totals.thinking_tokens,
4095        tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
4096        trace_id: telemetry_context.trace_id.clone(),
4097        metadata: telemetry_context.trace_id.as_ref().map(|value| {
4098            workflow_metadata_with_trace(
4099                options,
4100                value,
4101                telemetry_context.sampled,
4102                telemetry_context.trace_id_source,
4103            )
4104        }),
4105    }))
4106}
4107
4108fn build_yaml_context_from_ir_scope(scoped_input: &Value) -> Value {
4109    let input = scoped_input.get("input").cloned().unwrap_or(Value::Null);
4110
4111    let mut nodes = serde_json::Map::new();
4112    if let Some(node_outputs) = scoped_input.get("node_outputs").and_then(Value::as_object) {
4113        for (node_id, output) in node_outputs {
4114            nodes.insert(node_id.clone(), json!({"output": output.clone()}));
4115        }
4116    }
4117
4118    json!({
4119        "input": input,
4120        "nodes": Value::Object(nodes),
4121        "globals": Value::Object(serde_json::Map::new())
4122    })
4123}
4124
4125pub async fn run_email_workflow_yaml_with_custom_worker_and_events(
4126    workflow: &YamlWorkflow,
4127    email_text: &str,
4128    executor: &dyn YamlWorkflowLlmExecutor,
4129    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
4130    event_sink: Option<&dyn YamlWorkflowEventSink>,
4131) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
4132    let workflow_input = json!({ "email_text": email_text });
4133    run_workflow_yaml_with_custom_worker_and_events(
4134        workflow,
4135        &workflow_input,
4136        executor,
4137        custom_worker,
4138        event_sink,
4139    )
4140    .await
4141}
4142
4143fn evaluate_switch_condition(
4144    condition: &str,
4145    context: &Value,
4146) -> Result<bool, YamlWorkflowRunError> {
4147    let (left, right) =
4148        condition
4149            .split_once("==")
4150            .ok_or_else(|| YamlWorkflowRunError::UnsupportedCondition {
4151                condition: condition.to_string(),
4152            })?;
4153
4154    let left_path = left.trim().trim_start_matches("$.");
4155    let right_literal = right.trim().trim_matches('"').trim_matches('\'');
4156    let left_value = resolve_path(context, left_path);
4157    Ok(left_value
4158        .and_then(Value::as_str)
4159        .map(|value| value == right_literal)
4160        .unwrap_or(false))
4161}
4162
4163fn parse_messages_from_context(path: &str, context: &Value) -> Result<Vec<Message>, String> {
4164    let normalized_path = path.trim().trim_start_matches("$.");
4165    let value = resolve_path(context, normalized_path)
4166        .ok_or_else(|| format!("messages_path not found: {path}"))?;
4167    let list: Vec<WorkflowMessage> = serde_json::from_value(value.clone()).map_err(|err| {
4168        format!("messages_path must resolve to a list of messages: {path}; {err}")
4169    })?;
4170    if list.is_empty() {
4171        return Err(format!(
4172            "messages_path must not resolve to an empty list: {path}"
4173        ));
4174    }
4175
4176    let mut messages = Vec::with_capacity(list.len());
4177    for (index, item) in list.into_iter().enumerate() {
4178        let mut message = match item.role {
4179            Role::System => Message::system(item.content),
4180            Role::User => Message::user(item.content),
4181            Role::Assistant => Message::assistant(item.content),
4182            Role::Tool => {
4183                let tool_call_id = item
4184                    .tool_call_id
4185                    .ok_or_else(|| format!("tool message at index {index} missing tool_call_id"))?;
4186                Message::tool(item.content, tool_call_id)
4187            }
4188        };
4189
4190        if let Some(name) = item.name {
4191            message = message.with_name(name);
4192        }
4193
4194        messages.push(message);
4195    }
4196
4197    Ok(messages)
4198}
4199
4200pub fn verify_yaml_workflow(workflow: &YamlWorkflow) -> Vec<YamlWorkflowDiagnostic> {
4201    let mut diagnostics = Vec::new();
4202    let known_ids: HashMap<&str, &YamlNode> = workflow
4203        .nodes
4204        .iter()
4205        .map(|node| (node.id.as_str(), node))
4206        .collect();
4207
4208    if !known_ids.contains_key(workflow.entry_node.as_str()) {
4209        diagnostics.push(YamlWorkflowDiagnostic {
4210            node_id: None,
4211            code: "missing_entry".to_string(),
4212            severity: YamlWorkflowDiagnosticSeverity::Error,
4213            message: format!("entry node '{}' does not exist", workflow.entry_node),
4214        });
4215    }
4216
4217    for edge in &workflow.edges {
4218        if !known_ids.contains_key(edge.from.as_str()) {
4219            diagnostics.push(YamlWorkflowDiagnostic {
4220                node_id: Some(edge.from.clone()),
4221                code: "unknown_edge_from".to_string(),
4222                severity: YamlWorkflowDiagnosticSeverity::Error,
4223                message: format!("edge.from '{}' does not exist", edge.from),
4224            });
4225        }
4226        if !known_ids.contains_key(edge.to.as_str()) {
4227            diagnostics.push(YamlWorkflowDiagnostic {
4228                node_id: Some(edge.to.clone()),
4229                code: "unknown_edge_to".to_string(),
4230                severity: YamlWorkflowDiagnosticSeverity::Error,
4231                message: format!("edge.to '{}' does not exist", edge.to),
4232            });
4233        }
4234    }
4235
4236    for node in &workflow.nodes {
4237        if let Some(llm) = &node.node_type.llm_call {
4238            if llm.model.trim().is_empty() {
4239                diagnostics.push(YamlWorkflowDiagnostic {
4240                    node_id: Some(node.id.clone()),
4241                    code: "empty_model".to_string(),
4242                    severity: YamlWorkflowDiagnosticSeverity::Error,
4243                    message: "llm_call.model must not be empty".to_string(),
4244                });
4245            }
4246            if llm.stream.unwrap_or(false) && llm.heal.unwrap_or(false) {
4247                diagnostics.push(YamlWorkflowDiagnostic {
4248                    node_id: Some(node.id.clone()),
4249                    code: "stream_heal_conflict".to_string(),
4250                    severity: YamlWorkflowDiagnosticSeverity::Warning,
4251                    message:
4252                        "llm_call.stream=true with heal=true is not streamable; runtime will disable streaming"
4253                            .to_string(),
4254                });
4255            }
4256
4257            if llm.max_tool_roundtrips.unwrap_or(1) == 0 {
4258                diagnostics.push(YamlWorkflowDiagnostic {
4259                    node_id: Some(node.id.clone()),
4260                    code: "invalid_max_tool_roundtrips".to_string(),
4261                    severity: YamlWorkflowDiagnosticSeverity::Error,
4262                    message: "llm_call.max_tool_roundtrips must be >= 1".to_string(),
4263                });
4264            }
4265
4266            if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
4267                if global_key.trim().is_empty() {
4268                    diagnostics.push(YamlWorkflowDiagnostic {
4269                        node_id: Some(node.id.clone()),
4270                        code: "empty_tool_calls_global_key".to_string(),
4271                        severity: YamlWorkflowDiagnosticSeverity::Error,
4272                        message: "llm_call.tool_calls_global_key must not be empty".to_string(),
4273                    });
4274                }
4275            }
4276
4277            match normalize_tool_choice(llm.tool_choice.clone()) {
4278                Ok(choice) => {
4279                    if let Some(ToolChoice::Tool(choice_tool)) = choice.as_ref() {
4280                        if !llm.tools.iter().any(|tool| match (llm.tools_format, tool) {
4281                            (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
4282                                openai.function.name == choice_tool.function.name
4283                            }
4284                            (
4285                                YamlToolFormat::Simplified,
4286                                YamlToolDeclaration::Simplified(simple),
4287                            ) => simple.name == choice_tool.function.name,
4288                            _ => false,
4289                        }) {
4290                            diagnostics.push(YamlWorkflowDiagnostic {
4291                                node_id: Some(node.id.clone()),
4292                                code: "unknown_tool_choice_function".to_string(),
4293                                severity: YamlWorkflowDiagnosticSeverity::Error,
4294                                message: format!(
4295                                    "llm_call.tool_choice references unknown function '{}'",
4296                                    choice_tool.function.name
4297                                ),
4298                            });
4299                        }
4300                    }
4301                }
4302                Err(message) => {
4303                    diagnostics.push(YamlWorkflowDiagnostic {
4304                        node_id: Some(node.id.clone()),
4305                        code: "invalid_tool_choice".to_string(),
4306                        severity: YamlWorkflowDiagnosticSeverity::Error,
4307                        message,
4308                    });
4309                }
4310            }
4311
4312            let normalized_tools = match normalize_llm_tools(llm) {
4313                Ok(tools) => tools,
4314                Err(message) => {
4315                    diagnostics.push(YamlWorkflowDiagnostic {
4316                        node_id: Some(node.id.clone()),
4317                        code: "invalid_tools_format".to_string(),
4318                        severity: YamlWorkflowDiagnosticSeverity::Error,
4319                        message,
4320                    });
4321                    Vec::new()
4322                }
4323            };
4324
4325            let mut seen_tool_names = HashSet::new();
4326            for tool in &normalized_tools {
4327                let name = tool.definition.function.name.trim();
4328                if name.is_empty() {
4329                    diagnostics.push(YamlWorkflowDiagnostic {
4330                        node_id: Some(node.id.clone()),
4331                        code: "empty_tool_name".to_string(),
4332                        severity: YamlWorkflowDiagnosticSeverity::Error,
4333                        message: "tool function name must not be empty".to_string(),
4334                    });
4335                }
4336                if !seen_tool_names.insert(tool.definition.function.name.clone()) {
4337                    diagnostics.push(YamlWorkflowDiagnostic {
4338                        node_id: Some(node.id.clone()),
4339                        code: "duplicate_tool_name".to_string(),
4340                        severity: YamlWorkflowDiagnosticSeverity::Error,
4341                        message: format!(
4342                            "duplicate tool function name '{}' in node",
4343                            tool.definition.function.name
4344                        ),
4345                    });
4346                }
4347
4348                let schema = tool
4349                    .definition
4350                    .function
4351                    .parameters
4352                    .clone()
4353                    .unwrap_or(Value::Null);
4354                if schema.is_null() {
4355                    diagnostics.push(YamlWorkflowDiagnostic {
4356                        node_id: Some(node.id.clone()),
4357                        code: "missing_tool_input_schema".to_string(),
4358                        severity: YamlWorkflowDiagnosticSeverity::Error,
4359                        message: format!(
4360                            "tool '{}' is missing input schema",
4361                            tool.definition.function.name
4362                        ),
4363                    });
4364                } else if let Err(message) = validate_json_schema(&schema) {
4365                    diagnostics.push(YamlWorkflowDiagnostic {
4366                        node_id: Some(node.id.clone()),
4367                        code: "invalid_tool_input_schema".to_string(),
4368                        severity: YamlWorkflowDiagnosticSeverity::Error,
4369                        message: format!(
4370                            "tool '{}' has invalid input schema: {}",
4371                            tool.definition.function.name, message
4372                        ),
4373                    });
4374                }
4375
4376                if let Some(output_schema) = tool.output_schema.as_ref() {
4377                    if let Err(message) = validate_json_schema(output_schema) {
4378                        diagnostics.push(YamlWorkflowDiagnostic {
4379                            node_id: Some(node.id.clone()),
4380                            code: "invalid_tool_output_schema".to_string(),
4381                            severity: YamlWorkflowDiagnosticSeverity::Error,
4382                            message: format!(
4383                                "tool '{}' has invalid output schema: {}",
4384                                tool.definition.function.name, message
4385                            ),
4386                        });
4387                    }
4388                }
4389            }
4390        }
4391
4392        if let Some(switch) = &node.node_type.switch {
4393            for branch in &switch.branches {
4394                if !known_ids.contains_key(branch.target.as_str()) {
4395                    diagnostics.push(YamlWorkflowDiagnostic {
4396                        node_id: Some(node.id.clone()),
4397                        code: "unknown_switch_target".to_string(),
4398                        severity: YamlWorkflowDiagnosticSeverity::Error,
4399                        message: format!("switch branch target '{}' does not exist", branch.target),
4400                    });
4401                }
4402            }
4403            if !known_ids.contains_key(switch.default.as_str()) {
4404                diagnostics.push(YamlWorkflowDiagnostic {
4405                    node_id: Some(node.id.clone()),
4406                    code: "unknown_switch_default".to_string(),
4407                    severity: YamlWorkflowDiagnosticSeverity::Error,
4408                    message: format!("switch default target '{}' does not exist", switch.default),
4409                });
4410            }
4411        }
4412
4413        if let Some(config) = node.config.as_ref() {
4414            if let Some(update_globals) = config.update_globals.as_ref() {
4415                for (key, update) in update_globals {
4416                    let is_valid_op =
4417                        matches!(update.op.as_str(), "set" | "append" | "increment" | "merge");
4418                    if !is_valid_op {
4419                        diagnostics.push(YamlWorkflowDiagnostic {
4420                            node_id: Some(node.id.clone()),
4421                            code: "unknown_update_op".to_string(),
4422                            severity: YamlWorkflowDiagnosticSeverity::Error,
4423                            message: format!(
4424                                "update_globals key '{}' has unknown op '{}'; expected set|append|increment|merge",
4425                                key, update.op
4426                            ),
4427                        });
4428                    }
4429
4430                    if update.op != "increment" && update.from.is_none() {
4431                        diagnostics.push(YamlWorkflowDiagnostic {
4432                            node_id: Some(node.id.clone()),
4433                            code: "missing_update_from".to_string(),
4434                            severity: YamlWorkflowDiagnosticSeverity::Error,
4435                            message: format!(
4436                                "update_globals key '{}' with op '{}' requires 'from'",
4437                                key, update.op
4438                            ),
4439                        });
4440                    }
4441                }
4442            }
4443        }
4444    }
4445
4446    diagnostics
4447}
4448
4449fn resolve_path<'a>(value: &'a Value, path: &str) -> Option<&'a Value> {
4450    path.split('.')
4451        .filter(|segment| !segment.is_empty())
4452        .try_fold(value, |current, segment| {
4453            if let Ok(index) = segment.parse::<usize>() {
4454                return current.get(index);
4455            }
4456            current.get(segment)
4457        })
4458}
4459
4460fn interpolate_template(template: &str, context: &Value) -> String {
4461    let mut out = String::with_capacity(template.len());
4462    let mut rest = template;
4463
4464    loop {
4465        let Some(start) = rest.find("{{") else {
4466            out.push_str(rest);
4467            break;
4468        };
4469
4470        out.push_str(&rest[..start]);
4471        let after_start = &rest[start + 2..];
4472        let Some(end) = after_start.find("}}") else {
4473            out.push_str(&rest[start..]);
4474            break;
4475        };
4476
4477        let expr = after_start[..end].trim();
4478        let source_path = expr.trim_start_matches("$.");
4479        let replacement = resolve_path(context, source_path)
4480            .map(value_to_template_string)
4481            .unwrap_or_default();
4482        out.push_str(replacement.as_str());
4483
4484        rest = &after_start[end + 2..];
4485    }
4486
4487    out
4488}
4489
4490fn collect_template_bindings(template: &str, context: &Value) -> Vec<YamlTemplateBinding> {
4491    let mut bindings = Vec::new();
4492    let mut rest = template;
4493
4494    loop {
4495        let Some(start) = rest.find("{{") else {
4496            break;
4497        };
4498
4499        let after_start = &rest[start + 2..];
4500        let Some(end) = after_start.find("}}") else {
4501            break;
4502        };
4503
4504        let expr = after_start[..end].trim();
4505        let source_path = expr.trim_start_matches("$.").to_string();
4506        let resolved = resolve_path(context, source_path.as_str()).cloned();
4507        let missing = resolved.is_none();
4508        let resolved_value = resolved.unwrap_or(Value::Null);
4509        bindings.push(YamlTemplateBinding {
4510            index: bindings.len(),
4511            expression: expr.to_string(),
4512            source_path,
4513            resolved_type: json_type_name(&resolved_value).to_string(),
4514            missing,
4515            resolved: resolved_value,
4516        });
4517
4518        rest = &after_start[end + 2..];
4519    }
4520
4521    bindings
4522}
4523
4524fn json_type_name(value: &Value) -> &'static str {
4525    match value {
4526        Value::Null => "null",
4527        Value::Bool(_) => "bool",
4528        Value::Number(_) => "number",
4529        Value::String(_) => "string",
4530        Value::Array(_) => "array",
4531        Value::Object(_) => "object",
4532    }
4533}
4534
4535fn value_to_template_string(value: &Value) -> String {
4536    match value {
4537        Value::Null => String::new(),
4538        Value::Bool(v) => v.to_string(),
4539        Value::Number(v) => v.to_string(),
4540        Value::String(v) => v.clone(),
4541        Value::Array(_) | Value::Object(_) => serde_json::to_string(value).unwrap_or_default(),
4542    }
4543}
4544
4545fn apply_set_globals(
4546    node: &YamlNode,
4547    outputs: &BTreeMap<String, Value>,
4548    workflow_input: &Value,
4549    globals: &mut serde_json::Map<String, Value>,
4550) {
4551    let Some(config) = node.config.as_ref() else {
4552        return;
4553    };
4554    let Some(set_globals) = config.set_globals.as_ref() else {
4555        return;
4556    };
4557
4558    let context = json!({
4559        "input": workflow_input,
4560        "nodes": outputs,
4561        "globals": Value::Object(globals.clone())
4562    });
4563
4564    for (key, expr) in set_globals {
4565        let value = resolve_path(&context, expr.as_str())
4566            .cloned()
4567            .unwrap_or(Value::Null);
4568        globals.insert(key.clone(), value);
4569    }
4570}
4571
4572fn apply_update_globals(
4573    node: &YamlNode,
4574    outputs: &BTreeMap<String, Value>,
4575    workflow_input: &Value,
4576    globals: &mut serde_json::Map<String, Value>,
4577) {
4578    let Some(config) = node.config.as_ref() else {
4579        return;
4580    };
4581    let Some(update_globals) = config.update_globals.as_ref() else {
4582        return;
4583    };
4584
4585    let context = json!({
4586        "input": workflow_input,
4587        "nodes": outputs,
4588        "globals": Value::Object(globals.clone())
4589    });
4590
4591    for (key, update) in update_globals {
4592        match update.op.as_str() {
4593            "set" => {
4594                if let Some(path) = update.from.as_ref() {
4595                    let value = resolve_path(&context, path.as_str())
4596                        .cloned()
4597                        .unwrap_or(Value::Null);
4598                    globals.insert(key.clone(), value);
4599                }
4600            }
4601            "append" => {
4602                if let Some(path) = update.from.as_ref() {
4603                    let value = resolve_path(&context, path.as_str())
4604                        .cloned()
4605                        .unwrap_or(Value::Null);
4606                    let entry = globals
4607                        .entry(key.clone())
4608                        .or_insert_with(|| Value::Array(Vec::new()));
4609                    match entry {
4610                        Value::Array(items) => items.push(value),
4611                        other => {
4612                            let existing = other.clone();
4613                            *other = Value::Array(vec![existing, value]);
4614                        }
4615                    }
4616                }
4617            }
4618            "increment" => {
4619                let by = update.by.unwrap_or(1.0);
4620                let current = globals
4621                    .get(key.as_str())
4622                    .and_then(Value::as_f64)
4623                    .unwrap_or(0.0);
4624                if let Some(next) = serde_json::Number::from_f64(current + by) {
4625                    globals.insert(key.clone(), Value::Number(next));
4626                }
4627            }
4628            "merge" => {
4629                if let Some(path) = update.from.as_ref() {
4630                    let source = resolve_path(&context, path.as_str())
4631                        .cloned()
4632                        .unwrap_or(Value::Null);
4633                    if let Value::Object(source_map) = source {
4634                        let target = globals
4635                            .entry(key.clone())
4636                            .or_insert_with(|| Value::Object(serde_json::Map::new()));
4637                        if let Value::Object(target_map) = target {
4638                            target_map.extend(source_map);
4639                        } else {
4640                            *target = Value::Object(source_map);
4641                        }
4642                    }
4643                }
4644            }
4645            _ => {}
4646        }
4647    }
4648}
4649
4650fn llm_output_schema_for_node(node: &YamlNode) -> Value {
4651    if let Some(schema) = node
4652        .config
4653        .as_ref()
4654        .and_then(|cfg| cfg.output_schema.clone())
4655    {
4656        return schema;
4657    }
4658
4659    default_llm_output_schema()
4660}
4661
4662fn normalize_tool_choice(
4663    config: Option<YamlToolChoiceConfig>,
4664) -> Result<Option<ToolChoice>, String> {
4665    let Some(config) = config else {
4666        return Ok(None);
4667    };
4668
4669    let choice = match config {
4670        YamlToolChoiceConfig::Mode(mode) => ToolChoice::Mode(mode),
4671        YamlToolChoiceConfig::Function { function } => ToolChoice::Tool(ToolChoiceTool {
4672            tool_type: ToolType::Function,
4673            function: ToolChoiceFunction { name: function },
4674        }),
4675        YamlToolChoiceConfig::OpenAi(tool) => ToolChoice::Tool(tool),
4676    };
4677
4678    Ok(Some(choice))
4679}
4680
4681fn normalize_llm_tools(llm: &YamlLlmCall) -> Result<Vec<YamlResolvedTool>, String> {
4682    llm.tools
4683        .iter()
4684        .map(|tool| match (llm.tools_format, tool) {
4685            (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
4686                let definition = ToolDefinition {
4687                    tool_type: openai.tool_type.unwrap_or(ToolType::Function),
4688                    function: ToolFunction {
4689                        name: openai.function.name.clone(),
4690                        description: openai.function.description.clone(),
4691                        parameters: openai.function.parameters.clone(),
4692                    },
4693                };
4694                Ok(YamlResolvedTool {
4695                    definition,
4696                    output_schema: openai.function.output_schema.clone(),
4697                })
4698            }
4699            (YamlToolFormat::Simplified, YamlToolDeclaration::Simplified(simple)) => {
4700                let definition = ToolDefinition {
4701                    tool_type: ToolType::Function,
4702                    function: ToolFunction {
4703                        name: simple.name.clone(),
4704                        description: simple.description.clone(),
4705                        parameters: Some(simple.input_schema.clone()),
4706                    },
4707                };
4708                Ok(YamlResolvedTool {
4709                    definition,
4710                    output_schema: simple.output_schema.clone(),
4711                })
4712            }
4713            (YamlToolFormat::Openai, _) => {
4714                Err("tools_format=openai requires OpenAI-style tool declarations".to_string())
4715            }
4716            (YamlToolFormat::Simplified, _) => {
4717                Err("tools_format=simplified requires simplified tool declarations".to_string())
4718            }
4719        })
4720        .collect()
4721}
4722
4723fn default_llm_output_schema() -> Value {
4724    json!({
4725        "type": "object",
4726        "additionalProperties": true
4727    })
4728}
4729
4730fn mock_rag(topic: &str) -> Value {
4731    let (kb_source, playbook) = match topic {
4732        "probation" => (
4733            "hr_policy/probation.md",
4734            "Collect manager review, performance evidence, and probation timeline.",
4735        ),
4736        "leave_request" => (
4737            "hr_policy/leave.md",
4738            "Validate leave balance, manager approval, and blackout dates.",
4739        ),
4740        "supply_chain_order_assessment" => (
4741            "supply_chain/order_assessment.md",
4742            "Review order specs, inventory risk, and vendor lead-time guidance.",
4743        ),
4744        "supply_chain_order_replacement" => (
4745            "supply_chain/order_replacement.md",
4746            "Collect order id, damage proof, and replacement SLA policy.",
4747        ),
4748        "termination_first_time_offense" => (
4749            "hr_policy/termination_first_offense.md",
4750            "Validate first-incident criteria and route to HRBP review.",
4751        ),
4752        "termination_repeated_offense" => (
4753            "hr_policy/termination_repeated_offense.md",
4754            "Collect prior warnings and escalation approvals before final action.",
4755        ),
4756        _ => (
4757            "shared/request_clarification.md",
4758            "Request clarifying details before routing.",
4759        ),
4760    };
4761
4762    json!({
4763        "kb_source": kb_source,
4764        "playbook": playbook,
4765    })
4766}
4767
4768async fn execute_subworkflow_tool_call(
4769    payload: &Value,
4770    context: &Value,
4771    client: &SimpleAgentsClient,
4772    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
4773    parent_options: &YamlWorkflowRunOptions,
4774    parent_trace_context: Option<&TraceContext>,
4775    resolved_trace_id: Option<&str>,
4776) -> Result<Value, String> {
4777    let workflow_id = payload
4778        .get("workflow_id")
4779        .and_then(Value::as_str)
4780        .ok_or_else(|| "run_workflow_graph requires payload.workflow_id".to_string())?;
4781
4782    let input_context = context
4783        .get("input")
4784        .and_then(Value::as_object)
4785        .ok_or_else(|| "run_workflow_graph requires context.input".to_string())?;
4786
4787    let registry = input_context
4788        .get("workflow_registry")
4789        .and_then(Value::as_object)
4790        .ok_or_else(|| {
4791            "run_workflow_graph requires input.workflow_registry map of workflow_id -> yaml_path"
4792                .to_string()
4793        })?;
4794
4795    let workflow_path = registry
4796        .get(workflow_id)
4797        .and_then(Value::as_str)
4798        .ok_or_else(|| {
4799            format!(
4800                "workflow_registry has no entry for workflow_id '{}'",
4801                workflow_id
4802            )
4803        })?;
4804
4805    let parent_depth = input_context
4806        .get("__subgraph_depth")
4807        .and_then(Value::as_u64)
4808        .unwrap_or(0);
4809    let max_depth = input_context
4810        .get("__subgraph_max_depth")
4811        .and_then(Value::as_u64)
4812        .unwrap_or(3);
4813
4814    if parent_depth >= max_depth {
4815        return Err(format!(
4816            "run_workflow_graph depth limit reached (depth={}, max={})",
4817            parent_depth, max_depth
4818        ));
4819    }
4820
4821    let mut subworkflow_input = payload
4822        .get("input")
4823        .and_then(Value::as_object)
4824        .cloned()
4825        .unwrap_or_default();
4826
4827    if !subworkflow_input.contains_key("messages") {
4828        if let Some(messages) = input_context.get("messages") {
4829            subworkflow_input.insert("messages".to_string(), messages.clone());
4830        }
4831    }
4832
4833    if !subworkflow_input.contains_key("email_text") {
4834        if let Some(email_text) = input_context.get("email_text") {
4835            subworkflow_input.insert("email_text".to_string(), email_text.clone());
4836        }
4837    }
4838
4839    if !subworkflow_input.contains_key("workflow_registry") {
4840        subworkflow_input.insert(
4841            "workflow_registry".to_string(),
4842            Value::Object(registry.clone()),
4843        );
4844    }
4845
4846    subworkflow_input.insert(
4847        "__subgraph_depth".to_string(),
4848        Value::Number(serde_json::Number::from(parent_depth + 1)),
4849    );
4850    subworkflow_input.insert(
4851        "__subgraph_max_depth".to_string(),
4852        Value::Number(serde_json::Number::from(max_depth)),
4853    );
4854
4855    let subworkflow_options =
4856        build_subworkflow_options(parent_options, parent_trace_context, resolved_trace_id);
4857
4858    let output = run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
4859        Path::new(workflow_path),
4860        &Value::Object(subworkflow_input),
4861        client,
4862        custom_worker,
4863        None,
4864        &subworkflow_options,
4865    )
4866    .await
4867    .map_err(|error| format!("subworkflow '{}' failed: {}", workflow_id, error))?;
4868
4869    Ok(json!({
4870        "workflow_id": workflow_id,
4871        "workflow_path": workflow_path,
4872        "terminal_node": output.terminal_node,
4873        "terminal_output": output.terminal_output,
4874        "trace": output.trace,
4875    }))
4876}
4877
4878fn build_subworkflow_options(
4879    parent_options: &YamlWorkflowRunOptions,
4880    parent_trace_context: Option<&TraceContext>,
4881    resolved_trace_id: Option<&str>,
4882) -> YamlWorkflowRunOptions {
4883    let mut subworkflow_options = parent_options.clone();
4884    if parent_trace_context.is_some() || resolved_trace_id.is_some() {
4885        let trace_context = YamlWorkflowTraceContextInput {
4886            trace_id: resolved_trace_id
4887                .map(|value| value.to_string())
4888                .or_else(|| parent_trace_context.and_then(|ctx| ctx.trace_id.clone())),
4889            span_id: parent_trace_context.and_then(|ctx| ctx.span_id.clone()),
4890            parent_span_id: parent_trace_context.and_then(|ctx| ctx.parent_span_id.clone()),
4891            traceparent: parent_trace_context.and_then(|ctx| ctx.traceparent.clone()),
4892            tracestate: parent_trace_context.and_then(|ctx| ctx.tracestate.clone()),
4893            baggage: parent_trace_context
4894                .map(|ctx| ctx.baggage.clone())
4895                .unwrap_or_default(),
4896        };
4897        subworkflow_options.trace.context = Some(trace_context);
4898    }
4899    subworkflow_options
4900}
4901
4902fn mock_custom_worker_output(
4903    handler: &str,
4904    payload: &Value,
4905) -> Result<Value, YamlWorkflowRunError> {
4906    if handler == "get_employee_record" {
4907        let employee_name = payload
4908            .get("employee_name")
4909            .and_then(Value::as_str)
4910            .unwrap_or("Unknown Employee")
4911            .trim();
4912        let normalized_name = if employee_name.is_empty() {
4913            "Unknown Employee"
4914        } else {
4915            employee_name
4916        };
4917
4918        let (employee_id, location) = match normalized_name.to_ascii_lowercase().as_str() {
4919            "alex johnson" => ("EMP-2041", "Austin"),
4920            "priya sharma" => ("EMP-3378", "Bengaluru"),
4921            "marcus lee" => ("EMP-1196", "Singapore"),
4922            "sarah chen" => ("EMP-4450", "Toronto"),
4923            _ => ("EMP-0000", "Unassigned"),
4924        };
4925
4926        return Ok(json!({
4927            "employee_name": normalized_name,
4928            "employee_id": employee_id,
4929            "location": location,
4930        }));
4931    }
4932
4933    if let Some(topic) = payload.get("topic").and_then(Value::as_str) {
4934        let mut value = mock_rag(topic);
4935        if let Value::Object(object) = &mut value {
4936            object.insert("handler".to_string(), Value::String(handler.to_string()));
4937        }
4938        return Ok(value);
4939    }
4940
4941    Err(YamlWorkflowRunError::UnsupportedCustomHandler {
4942        handler: handler.to_string(),
4943    })
4944}
4945
4946#[derive(Debug, Clone, Deserialize)]
4947pub struct YamlWorkflow {
4948    pub id: String,
4949    pub entry_node: String,
4950    #[serde(default)]
4951    pub nodes: Vec<YamlNode>,
4952    #[serde(default)]
4953    pub edges: Vec<YamlEdge>,
4954}
4955
4956#[derive(Debug, Clone, Deserialize)]
4957pub struct YamlNode {
4958    pub id: String,
4959    pub node_type: YamlNodeType,
4960    pub config: Option<YamlNodeConfig>,
4961}
4962
4963impl YamlNode {
4964    fn kind_name(&self) -> &'static str {
4965        if self.node_type.llm_call.is_some() {
4966            "llm_call"
4967        } else if self.node_type.switch.is_some() {
4968            "switch"
4969        } else if self.node_type.custom_worker.is_some() {
4970            "custom_worker"
4971        } else {
4972            "unknown"
4973        }
4974    }
4975}
4976
4977#[derive(Debug, Clone, Deserialize)]
4978pub struct YamlNodeType {
4979    pub llm_call: Option<YamlLlmCall>,
4980    pub switch: Option<YamlSwitch>,
4981    pub custom_worker: Option<YamlCustomWorker>,
4982}
4983
4984#[derive(Debug, Clone, Deserialize)]
4985pub struct YamlLlmCall {
4986    pub model: String,
4987    pub stream: Option<bool>,
4988    pub stream_json_as_text: Option<bool>,
4989    pub heal: Option<bool>,
4990    pub messages_path: Option<String>,
4991    pub append_prompt_as_user: Option<bool>,
4992    #[serde(default)]
4993    pub tools_format: YamlToolFormat,
4994    #[serde(default)]
4995    pub tools: Vec<YamlToolDeclaration>,
4996    pub tool_choice: Option<YamlToolChoiceConfig>,
4997    pub max_tool_roundtrips: Option<u8>,
4998    pub tool_calls_global_key: Option<String>,
4999}
5000
5001#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
5002#[serde(rename_all = "snake_case")]
5003pub enum YamlToolFormat {
5004    #[default]
5005    Openai,
5006    Simplified,
5007}
5008
5009#[derive(Debug, Clone, Deserialize)]
5010#[serde(untagged)]
5011pub enum YamlToolDeclaration {
5012    OpenAi(YamlOpenAiToolDeclaration),
5013    Simplified(YamlSimplifiedToolDeclaration),
5014}
5015
5016#[derive(Debug, Clone, Deserialize)]
5017pub struct YamlOpenAiToolDeclaration {
5018    #[serde(rename = "type")]
5019    pub tool_type: Option<ToolType>,
5020    pub function: YamlOpenAiToolFunction,
5021}
5022
5023#[derive(Debug, Clone, Deserialize)]
5024pub struct YamlOpenAiToolFunction {
5025    pub name: String,
5026    pub description: Option<String>,
5027    pub parameters: Option<Value>,
5028    pub output_schema: Option<Value>,
5029}
5030
5031#[derive(Debug, Clone, Deserialize)]
5032pub struct YamlSimplifiedToolDeclaration {
5033    pub name: String,
5034    pub description: Option<String>,
5035    pub input_schema: Value,
5036    pub output_schema: Option<Value>,
5037}
5038
5039#[derive(Debug, Clone, Deserialize)]
5040#[serde(untagged)]
5041pub enum YamlToolChoiceConfig {
5042    Mode(ToolChoiceMode),
5043    Function { function: String },
5044    OpenAi(ToolChoiceTool),
5045}
5046
5047#[derive(Debug, Clone, Deserialize)]
5048pub struct YamlSwitch {
5049    #[serde(default)]
5050    pub branches: Vec<YamlSwitchBranch>,
5051    pub default: String,
5052}
5053
5054#[derive(Debug, Clone, Deserialize)]
5055pub struct YamlSwitchBranch {
5056    pub condition: String,
5057    pub target: String,
5058}
5059
5060#[derive(Debug, Clone, Deserialize)]
5061pub struct YamlCustomWorker {
5062    pub handler: String,
5063}
5064
5065#[derive(Debug, Clone, Deserialize)]
5066pub struct YamlNodeConfig {
5067    pub prompt: Option<String>,
5068    #[serde(default, alias = "schema")]
5069    pub output_schema: Option<Value>,
5070    pub payload: Option<Value>,
5071    pub set_globals: Option<HashMap<String, String>>,
5072    pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
5073}
5074
5075#[derive(Debug, Clone, Deserialize)]
5076pub struct YamlGlobalUpdate {
5077    pub op: String,
5078    pub from: Option<String>,
5079    pub by: Option<f64>,
5080}
5081
5082#[derive(Debug, Clone, Deserialize)]
5083pub struct YamlEdge {
5084    pub from: String,
5085    pub to: String,
5086}
5087
5088#[cfg(test)]
5089mod tests {
5090    use super::*;
5091    use simple_agent_type::provider::{Provider, ProviderRequest, ProviderResponse};
5092    use simple_agent_type::response::{CompletionChoice, CompletionResponse, Usage};
5093    use simple_agent_type::tool::{ToolCallFunction, ToolType};
5094    use simple_agent_type::{Result as SaResult, SimpleAgentsError};
5095    use simple_agents_core::SimpleAgentsClientBuilder;
5096    use std::fs;
5097    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5098    use std::sync::{Arc, Mutex, OnceLock};
5099
5100    fn stream_debug_env_lock() -> &'static Mutex<()> {
5101        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
5102        LOCK.get_or_init(|| Mutex::new(()))
5103    }
5104
5105    struct MockExecutor;
5106
5107    struct RecordingSink {
5108        events: Mutex<Vec<YamlWorkflowEvent>>,
5109    }
5110
5111    struct CancelAfterFirstEventSink {
5112        cancelled: AtomicBool,
5113    }
5114
5115    impl YamlWorkflowEventSink for CancelAfterFirstEventSink {
5116        fn emit(&self, _event: &YamlWorkflowEvent) {
5117            self.cancelled.store(true, Ordering::SeqCst);
5118        }
5119
5120        fn is_cancelled(&self) -> bool {
5121            self.cancelled.load(Ordering::SeqCst)
5122        }
5123    }
5124
5125    struct CountingExecutor {
5126        call_count: AtomicUsize,
5127    }
5128
5129    struct CapturingWorker {
5130        context: Mutex<Option<Value>>,
5131    }
5132
5133    struct ToolLoopProvider;
5134
5135    struct UnknownToolProvider;
5136
5137    #[async_trait]
5138    impl Provider for ToolLoopProvider {
5139        fn name(&self) -> &str {
5140            "openai"
5141        }
5142
5143        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5144            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5145            Ok(ProviderRequest::new("mock://tool-loop").with_body(body))
5146        }
5147
5148        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5149            let request: CompletionRequest =
5150                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5151
5152            let has_tools = request
5153                .tools
5154                .as_ref()
5155                .is_some_and(|tools| !tools.is_empty());
5156            let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
5157
5158            let response = if has_tools && !has_tool_result {
5159                CompletionResponse {
5160                    id: "resp_tool_1".to_string(),
5161                    model: request.model.clone(),
5162                    choices: vec![CompletionChoice {
5163                        index: 0,
5164                        message: Message::assistant("").with_tool_calls(vec![ToolCall {
5165                            id: "call_get_context".to_string(),
5166                            tool_type: ToolType::Function,
5167                            function: ToolCallFunction {
5168                                name: "get_customer_context".to_string(),
5169                                arguments: "{\"order_id\":\"123\"}".to_string(),
5170                            },
5171                        }]),
5172                        finish_reason: FinishReason::ToolCalls,
5173                        logprobs: None,
5174                    }],
5175                    usage: Usage::new(10, 5),
5176                    created: None,
5177                    provider: Some(self.name().to_string()),
5178                    healing_metadata: None,
5179                }
5180            } else if has_tools && has_tool_result {
5181                CompletionResponse {
5182                    id: "resp_tool_2".to_string(),
5183                    model: request.model.clone(),
5184                    choices: vec![CompletionChoice {
5185                        index: 0,
5186                        message: Message::assistant("{\"state\":\"done\"}"),
5187                        finish_reason: FinishReason::Stop,
5188                        logprobs: None,
5189                    }],
5190                    usage: Usage::new(12, 6),
5191                    created: None,
5192                    provider: Some(self.name().to_string()),
5193                    healing_metadata: None,
5194                }
5195            } else {
5196                let prompt = request
5197                    .messages
5198                    .iter()
5199                    .rev()
5200                    .find(|m| m.role == Role::User)
5201                    .map(|m| m.content.clone())
5202                    .unwrap_or_default();
5203                let payload = json!({
5204                    "subject": "ok",
5205                    "body": prompt,
5206                })
5207                .to_string();
5208                CompletionResponse {
5209                    id: "resp_final".to_string(),
5210                    model: request.model.clone(),
5211                    choices: vec![CompletionChoice {
5212                        index: 0,
5213                        message: Message::assistant(payload),
5214                        finish_reason: FinishReason::Stop,
5215                        logprobs: None,
5216                    }],
5217                    usage: Usage::new(8, 4),
5218                    created: None,
5219                    provider: Some(self.name().to_string()),
5220                    healing_metadata: None,
5221                }
5222            };
5223
5224            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5225            Ok(ProviderResponse::new(200, body))
5226        }
5227
5228        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5229            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5230        }
5231    }
5232
5233    #[async_trait]
5234    impl Provider for UnknownToolProvider {
5235        fn name(&self) -> &str {
5236            "openai"
5237        }
5238
5239        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5240            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5241            Ok(ProviderRequest::new("mock://unknown-tool").with_body(body))
5242        }
5243
5244        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5245            let request: CompletionRequest =
5246                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5247
5248            let response = CompletionResponse {
5249                id: "resp_unknown_tool".to_string(),
5250                model: request.model,
5251                choices: vec![CompletionChoice {
5252                    index: 0,
5253                    message: Message::assistant("").with_tool_calls(vec![ToolCall {
5254                        id: "call_unknown".to_string(),
5255                        tool_type: ToolType::Function,
5256                        function: ToolCallFunction {
5257                            name: "unknown_tool".to_string(),
5258                            arguments: "{}".to_string(),
5259                        },
5260                    }]),
5261                    finish_reason: FinishReason::ToolCalls,
5262                    logprobs: None,
5263                }],
5264                usage: Usage::new(5, 2),
5265                created: None,
5266                provider: Some(self.name().to_string()),
5267                healing_metadata: None,
5268            };
5269
5270            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5271            Ok(ProviderResponse::new(200, body))
5272        }
5273
5274        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5275            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5276        }
5277    }
5278
5279    struct FixedToolWorker {
5280        payload: Value,
5281    }
5282
5283    struct CountingToolWorker {
5284        execute_calls: AtomicUsize,
5285    }
5286
5287    #[async_trait]
5288    impl YamlWorkflowCustomWorkerExecutor for FixedToolWorker {
5289        async fn execute(
5290            &self,
5291            _handler: &str,
5292            _payload: &Value,
5293            _email_text: &str,
5294            _context: &Value,
5295        ) -> Result<Value, String> {
5296            Ok(self.payload.clone())
5297        }
5298    }
5299
5300    #[async_trait]
5301    impl YamlWorkflowCustomWorkerExecutor for CountingToolWorker {
5302        async fn execute(
5303            &self,
5304            _handler: &str,
5305            _payload: &Value,
5306            _email_text: &str,
5307            _context: &Value,
5308        ) -> Result<Value, String> {
5309            self.execute_calls.fetch_add(1, Ordering::SeqCst);
5310            Ok(json!({"ok": true}))
5311        }
5312    }
5313
5314    #[async_trait]
5315    impl YamlWorkflowLlmExecutor for CountingExecutor {
5316        async fn complete_structured(
5317            &self,
5318            _request: YamlLlmExecutionRequest,
5319            _event_sink: Option<&dyn YamlWorkflowEventSink>,
5320        ) -> Result<YamlLlmExecutionResult, String> {
5321            self.call_count.fetch_add(1, Ordering::SeqCst);
5322            Ok(YamlLlmExecutionResult {
5323                payload: json!({"state":"ok"}),
5324                usage: None,
5325                ttft_ms: None,
5326                tool_calls: Vec::new(),
5327            })
5328        }
5329    }
5330
5331    impl YamlWorkflowEventSink for RecordingSink {
5332        fn emit(&self, event: &YamlWorkflowEvent) {
5333            self.events
5334                .lock()
5335                .expect("recording sink lock should not be poisoned")
5336                .push(event.clone());
5337        }
5338    }
5339
5340    #[async_trait]
5341    impl YamlWorkflowCustomWorkerExecutor for CapturingWorker {
5342        async fn execute(
5343            &self,
5344            _handler: &str,
5345            _payload: &Value,
5346            _email_text: &str,
5347            context: &Value,
5348        ) -> Result<Value, String> {
5349            let mut guard = self
5350                .context
5351                .lock()
5352                .map_err(|_| "capturing worker lock should not be poisoned".to_string())?;
5353            *guard = Some(context.clone());
5354            Ok(json!({"ok": true}))
5355        }
5356    }
5357
5358    #[async_trait]
5359    impl YamlWorkflowLlmExecutor for MockExecutor {
5360        async fn complete_structured(
5361            &self,
5362            request: YamlLlmExecutionRequest,
5363            _event_sink: Option<&dyn YamlWorkflowEventSink>,
5364        ) -> Result<YamlLlmExecutionResult, String> {
5365            let prompt = request.prompt;
5366            if prompt.contains("exactly one category") {
5367                return Ok(YamlLlmExecutionResult {
5368                    payload: json!({"category":"termination","reason":"mock"}),
5369                    usage: Some(YamlLlmTokenUsage {
5370                        prompt_tokens: 10,
5371                        completion_tokens: 5,
5372                        total_tokens: 15,
5373                        thinking_tokens: None,
5374                    }),
5375                    ttft_ms: None,
5376                    tool_calls: Vec::new(),
5377                });
5378            }
5379            if prompt.contains("Determine termination subtype") {
5380                return Ok(YamlLlmExecutionResult {
5381                    payload: json!({"subtype":"repeated_offense","reason":"mock"}),
5382                    usage: Some(YamlLlmTokenUsage {
5383                        prompt_tokens: 12,
5384                        completion_tokens: 6,
5385                        total_tokens: 18,
5386                        thinking_tokens: None,
5387                    }),
5388                    ttft_ms: None,
5389                    tool_calls: Vec::new(),
5390                });
5391            }
5392            if prompt.contains("Determine supply chain subtype") {
5393                return Ok(YamlLlmExecutionResult {
5394                    payload: json!({"subtype":"order_replacement","reason":"mock"}),
5395                    usage: Some(YamlLlmTokenUsage {
5396                        prompt_tokens: 11,
5397                        completion_tokens: 4,
5398                        total_tokens: 15,
5399                        thinking_tokens: None,
5400                    }),
5401                    ttft_ms: None,
5402                    tool_calls: Vec::new(),
5403                });
5404            }
5405            Err("unexpected prompt".to_string())
5406        }
5407    }
5408
5409    #[tokio::test]
5410    async fn runs_yaml_workflow_and_returns_step_timings() {
5411        let yaml = r#"
5412id: email-intake-classification
5413entry_node: classify_top_level
5414nodes:
5415  - id: classify_top_level
5416    node_type:
5417      llm_call:
5418        model: gpt-4.1
5419    config:
5420      prompt: |
5421        Classify this email into exactly one category:
5422        {{ input.email_text }}
5423  - id: route_top_level
5424    node_type:
5425      switch:
5426        branches:
5427          - condition: '$.nodes.classify_top_level.output.category == "termination"'
5428            target: classify_termination_subtype
5429        default: rag_clarification
5430  - id: classify_termination_subtype
5431    node_type:
5432      llm_call:
5433        model: gpt-4.1
5434    config:
5435      prompt: |
5436        Determine termination subtype:
5437        {{ input.email_text }}
5438  - id: route_termination_subtype
5439    node_type:
5440      switch:
5441        branches:
5442          - condition: '$.nodes.classify_termination_subtype.output.subtype == "repeated_offense"'
5443            target: rag_termination_repeated_offense
5444        default: rag_clarification
5445  - id: rag_termination_repeated_offense
5446    node_type:
5447      custom_worker:
5448        handler: GetRagData
5449    config:
5450      payload:
5451        topic: termination_repeated_offense
5452  - id: rag_clarification
5453    node_type:
5454      custom_worker:
5455        handler: GetRagData
5456    config:
5457      payload:
5458        topic: clarification
5459edges:
5460  - from: classify_top_level
5461    to: route_top_level
5462  - from: classify_termination_subtype
5463    to: route_termination_subtype
5464"#;
5465
5466        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5467        let output = run_email_workflow_yaml(&workflow, "test", &MockExecutor)
5468            .await
5469            .expect("yaml workflow should execute");
5470
5471        assert_eq!(output.workflow_id, "email-intake-classification");
5472        assert_eq!(output.terminal_node, "rag_termination_repeated_offense");
5473        assert!(!output.step_timings.is_empty());
5474        assert_eq!(output.step_timings.len(), output.trace.len());
5475        assert!(output
5476            .outputs
5477            .contains_key("rag_termination_repeated_offense"));
5478        assert_eq!(output.total_input_tokens, 22);
5479        assert_eq!(output.total_output_tokens, 11);
5480        assert_eq!(output.total_tokens, 33);
5481    }
5482
5483    #[tokio::test]
5484    async fn emits_resolved_llm_input_event_with_bindings() {
5485        let yaml = r#"
5486id: email-intake-classification
5487entry_node: classify_top_level
5488nodes:
5489  - id: classify_top_level
5490    node_type:
5491      llm_call:
5492        model: gpt-4.1
5493    config:
5494      prompt: |
5495        Classify this email into exactly one category:
5496        {{ input.email_text }}
5497"#;
5498
5499        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5500        let sink = RecordingSink {
5501            events: Mutex::new(Vec::new()),
5502        };
5503
5504        let output = run_email_workflow_yaml_with_custom_worker_and_events(
5505            &workflow,
5506            "Need help with termination",
5507            &MockExecutor,
5508            None,
5509            Some(&sink),
5510        )
5511        .await
5512        .expect("yaml workflow should execute");
5513
5514        assert_eq!(output.terminal_node, "classify_top_level");
5515
5516        let events = sink
5517            .events
5518            .lock()
5519            .expect("recording sink lock should not be poisoned");
5520        let llm_event = events
5521            .iter()
5522            .find(|event| event.event_type == "node_llm_input_resolved")
5523            .expect("expected llm input telemetry event");
5524
5525        let metadata = llm_event
5526            .metadata
5527            .as_ref()
5528            .expect("llm input event must include metadata");
5529        assert_eq!(metadata["model"], Value::String("gpt-4.1".to_string()));
5530        assert_eq!(metadata["stream_requested"], Value::Bool(false));
5531        assert_eq!(metadata["heal_requested"], Value::Bool(false));
5532        assert!(metadata["prompt"]
5533            .as_str()
5534            .expect("prompt should be a string")
5535            .contains("Need help with termination"));
5536
5537        let bindings = metadata["bindings"]
5538            .as_array()
5539            .expect("bindings should be an array");
5540        assert_eq!(bindings.len(), 1);
5541        assert_eq!(
5542            bindings[0]["source_path"],
5543            Value::String("input.email_text".to_string())
5544        );
5545        assert_eq!(
5546            bindings[0]["resolved"],
5547            Value::String("Need help with termination".to_string())
5548        );
5549        assert_eq!(bindings[0]["missing"], Value::Bool(false));
5550        assert_eq!(
5551            bindings[0]["resolved_type"],
5552            Value::String("string".to_string())
5553        );
5554    }
5555
5556    #[tokio::test]
5557    async fn workflow_completed_event_includes_nerdstats_by_default() {
5558        let yaml = r#"
5559id: nerdstats-default
5560entry_node: classify
5561nodes:
5562  - id: classify
5563    node_type:
5564      llm_call:
5565        model: gpt-4.1
5566    config:
5567      prompt: |
5568        Classify this email into exactly one category:
5569        {{ input.email_text }}
5570"#;
5571
5572        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5573        let sink = RecordingSink {
5574            events: Mutex::new(Vec::new()),
5575        };
5576
5577        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5578            &workflow,
5579            &json!({"email_text":"hello"}),
5580            &MockExecutor,
5581            None,
5582            Some(&sink),
5583            &YamlWorkflowRunOptions::default(),
5584        )
5585        .await
5586        .expect("workflow should execute");
5587
5588        let events = sink
5589            .events
5590            .lock()
5591            .expect("recording sink lock should not be poisoned");
5592        let completed = events
5593            .iter()
5594            .find(|event| event.event_type == "workflow_completed")
5595            .expect("expected workflow_completed event");
5596        let metadata = completed
5597            .metadata
5598            .as_ref()
5599            .expect("workflow_completed should include metadata by default");
5600        let nerdstats = metadata
5601            .get("nerdstats")
5602            .expect("nerdstats should be present by default");
5603
5604        assert_eq!(nerdstats["workflow_id"], Value::String(output.workflow_id));
5605        assert_eq!(
5606            nerdstats["terminal_node"],
5607            Value::String(output.terminal_node)
5608        );
5609        assert_eq!(
5610            nerdstats["total_tokens"],
5611            Value::Number(output.total_tokens.into())
5612        );
5613        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
5614        assert_eq!(
5615            nerdstats["token_metrics_source"],
5616            Value::String("provider_usage".to_string())
5617        );
5618        assert!(nerdstats.get("step_timings").is_none());
5619        assert!(nerdstats.get("llm_node_metrics").is_none());
5620        assert!(nerdstats.get("step_details").is_some());
5621        assert!(nerdstats.get("llm_node_models").is_some());
5622        assert_eq!(
5623            nerdstats["llm_node_models"]["classify"],
5624            Value::String("gpt-4.1".to_string())
5625        );
5626        assert_eq!(
5627            nerdstats["step_details"][0]["node_id"],
5628            Value::String("classify".to_string())
5629        );
5630        assert_eq!(nerdstats["ttft_ms"], Value::Null);
5631    }
5632
5633    #[tokio::test]
5634    async fn workflow_completed_event_omits_nerdstats_when_disabled() {
5635        let yaml = r#"
5636id: nerdstats-disabled
5637entry_node: classify
5638nodes:
5639  - id: classify
5640    node_type:
5641      llm_call:
5642        model: gpt-4.1
5643    config:
5644      prompt: |
5645        Classify this email into exactly one category:
5646        {{ input.email_text }}
5647"#;
5648
5649        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5650        let sink = RecordingSink {
5651            events: Mutex::new(Vec::new()),
5652        };
5653        let options = YamlWorkflowRunOptions {
5654            telemetry: YamlWorkflowTelemetryConfig {
5655                nerdstats: false,
5656                ..YamlWorkflowTelemetryConfig::default()
5657            },
5658            ..YamlWorkflowRunOptions::default()
5659        };
5660
5661        run_workflow_yaml_with_custom_worker_and_events_and_options(
5662            &workflow,
5663            &json!({"email_text":"hello"}),
5664            &MockExecutor,
5665            None,
5666            Some(&sink),
5667            &options,
5668        )
5669        .await
5670        .expect("workflow should execute");
5671
5672        let events = sink
5673            .events
5674            .lock()
5675            .expect("recording sink lock should not be poisoned");
5676        let completed = events
5677            .iter()
5678            .find(|event| event.event_type == "workflow_completed")
5679            .expect("expected workflow_completed event");
5680        assert!(completed.metadata.is_none());
5681    }
5682
5683    struct StreamAwareMockExecutor;
5684
5685    #[async_trait]
5686    impl YamlWorkflowLlmExecutor for StreamAwareMockExecutor {
5687        async fn complete_structured(
5688            &self,
5689            request: YamlLlmExecutionRequest,
5690            _event_sink: Option<&dyn YamlWorkflowEventSink>,
5691        ) -> Result<YamlLlmExecutionResult, String> {
5692            Ok(YamlLlmExecutionResult {
5693                payload: json!({"state":"ok"}),
5694                usage: Some(YamlLlmTokenUsage {
5695                    prompt_tokens: 20,
5696                    completion_tokens: 10,
5697                    total_tokens: 30,
5698                    thinking_tokens: None,
5699                }),
5700                ttft_ms: if request.stream { Some(12) } else { None },
5701                tool_calls: Vec::new(),
5702            })
5703        }
5704    }
5705
5706    #[tokio::test]
5707    async fn workflow_completed_event_includes_nerdstats_for_streaming_nodes() {
5708        let yaml = r#"
5709id: nerdstats-streaming
5710entry_node: classify
5711nodes:
5712  - id: classify
5713    node_type:
5714      llm_call:
5715        model: gpt-4.1
5716        stream: true
5717    config:
5718      prompt: |
5719        Return JSON only:
5720        {"state":"ok"}
5721"#;
5722
5723        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5724        let sink = RecordingSink {
5725            events: Mutex::new(Vec::new()),
5726        };
5727
5728        run_workflow_yaml_with_custom_worker_and_events_and_options(
5729            &workflow,
5730            &json!({"email_text":"hello"}),
5731            &StreamAwareMockExecutor,
5732            None,
5733            Some(&sink),
5734            &YamlWorkflowRunOptions::default(),
5735        )
5736        .await
5737        .expect("workflow should execute");
5738
5739        let events = sink
5740            .events
5741            .lock()
5742            .expect("recording sink lock should not be poisoned");
5743        let completed = events
5744            .iter()
5745            .find(|event| event.event_type == "workflow_completed")
5746            .expect("expected workflow_completed event");
5747        let metadata = completed
5748            .metadata
5749            .as_ref()
5750            .expect("workflow_completed should include metadata by default");
5751        let nerdstats = metadata
5752            .get("nerdstats")
5753            .expect("nerdstats should be present by default");
5754
5755        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
5756        assert_eq!(nerdstats["total_tokens"], Value::Number(30u64.into()));
5757        assert_eq!(nerdstats["ttft_ms"], Value::Number(12u64.into()));
5758        assert!(nerdstats.get("step_timings").is_none());
5759        assert!(nerdstats.get("llm_node_metrics").is_none());
5760        assert_eq!(
5761            nerdstats["llm_node_models"]["classify"],
5762            Value::String("gpt-4.1".to_string())
5763        );
5764        assert_eq!(
5765            nerdstats["step_details"][0]["node_id"],
5766            Value::String("classify".to_string())
5767        );
5768    }
5769
5770    #[test]
5771    fn workflow_nerdstats_marks_stream_token_metrics_unavailable() {
5772        let output = YamlWorkflowRunOutput {
5773            workflow_id: "workflow".to_string(),
5774            entry_node: "start".to_string(),
5775            email_text: "hello".to_string(),
5776            trace: vec!["llm_node".to_string()],
5777            outputs: BTreeMap::new(),
5778            terminal_node: "llm_node".to_string(),
5779            terminal_output: None,
5780            step_timings: vec![YamlStepTiming {
5781                node_id: "llm_node".to_string(),
5782                node_kind: "llm_call".to_string(),
5783                elapsed_ms: 100,
5784                prompt_tokens: None,
5785                completion_tokens: None,
5786                total_tokens: None,
5787                thinking_tokens: None,
5788                tokens_per_second: None,
5789            }],
5790            llm_node_metrics: BTreeMap::new(),
5791            llm_node_models: BTreeMap::new(),
5792            total_elapsed_ms: 100,
5793            ttft_ms: None,
5794            total_input_tokens: 0,
5795            total_output_tokens: 0,
5796            total_tokens: 0,
5797            total_thinking_tokens: None,
5798            tokens_per_second: 0.0,
5799            trace_id: Some("trace-1".to_string()),
5800            metadata: None,
5801        };
5802
5803        let nerdstats = workflow_nerdstats(&output);
5804        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(false));
5805        assert_eq!(
5806            nerdstats["token_metrics_source"],
5807            Value::String("provider_stream_usage_unavailable".to_string())
5808        );
5809        assert_eq!(nerdstats["total_tokens"], Value::Null);
5810        assert_eq!(nerdstats["ttft_ms"], Value::Null);
5811        assert!(nerdstats.get("step_timings").is_none());
5812        assert!(nerdstats.get("llm_node_metrics").is_none());
5813        assert_eq!(
5814            nerdstats["step_details"][0]["node_id"],
5815            Value::String("llm_node".to_string())
5816        );
5817        assert!(nerdstats["llm_node_models"]
5818            .as_object()
5819            .expect("llm_node_models should be an object")
5820            .is_empty());
5821    }
5822
5823    #[test]
5824    fn workflow_nerdstats_includes_ttft_when_available() {
5825        let output = YamlWorkflowRunOutput {
5826            workflow_id: "workflow".to_string(),
5827            entry_node: "start".to_string(),
5828            email_text: "hello".to_string(),
5829            trace: vec!["llm_node".to_string()],
5830            outputs: BTreeMap::new(),
5831            terminal_node: "llm_node".to_string(),
5832            terminal_output: None,
5833            step_timings: vec![YamlStepTiming {
5834                node_id: "llm_node".to_string(),
5835                node_kind: "llm_call".to_string(),
5836                elapsed_ms: 100,
5837                prompt_tokens: Some(10),
5838                completion_tokens: Some(15),
5839                total_tokens: Some(25),
5840                thinking_tokens: None,
5841                tokens_per_second: Some(150.0),
5842            }],
5843            llm_node_metrics: BTreeMap::new(),
5844            llm_node_models: BTreeMap::new(),
5845            total_elapsed_ms: 100,
5846            ttft_ms: Some(42),
5847            total_input_tokens: 10,
5848            total_output_tokens: 15,
5849            total_tokens: 25,
5850            total_thinking_tokens: None,
5851            tokens_per_second: 150.0,
5852            trace_id: Some("trace-2".to_string()),
5853            metadata: None,
5854        };
5855
5856        let nerdstats = workflow_nerdstats(&output);
5857        assert_eq!(nerdstats["ttft_ms"], Value::Number(42u64.into()));
5858        assert!(nerdstats.get("step_timings").is_none());
5859        assert!(nerdstats.get("llm_node_metrics").is_none());
5860        assert_eq!(
5861            nerdstats["step_details"][0]["node_id"],
5862            Value::String("llm_node".to_string())
5863        );
5864        assert!(nerdstats["llm_node_models"]
5865            .as_object()
5866            .expect("llm_node_models should be an object")
5867            .is_empty());
5868    }
5869
5870    struct MessageHistoryExecutor;
5871
5872    #[async_trait]
5873    impl YamlWorkflowLlmExecutor for MessageHistoryExecutor {
5874        async fn complete_structured(
5875            &self,
5876            request: YamlLlmExecutionRequest,
5877            _event_sink: Option<&dyn YamlWorkflowEventSink>,
5878        ) -> Result<YamlLlmExecutionResult, String> {
5879            let messages = request
5880                .messages
5881                .ok_or_else(|| "expected messages in request".to_string())?;
5882            if messages.len() != 2 {
5883                return Err(format!("expected 2 messages, got {}", messages.len()));
5884            }
5885            Ok(YamlLlmExecutionResult {
5886                payload: json!({"category":"termination","reason":"history"}),
5887                usage: Some(YamlLlmTokenUsage {
5888                    prompt_tokens: 7,
5889                    completion_tokens: 3,
5890                    total_tokens: 10,
5891                    thinking_tokens: None,
5892                }),
5893                ttft_ms: None,
5894                tool_calls: Vec::new(),
5895            })
5896        }
5897    }
5898
5899    #[tokio::test]
5900    async fn supports_messages_path_in_workflow_input() {
5901        let yaml = r#"
5902id: email-intake-classification
5903entry_node: classify_top_level
5904nodes:
5905  - id: classify_top_level
5906    node_type:
5907      llm_call:
5908        model: gpt-4.1
5909        messages_path: input.messages
5910        append_prompt_as_user: false
5911"#;
5912
5913        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5914        let input = json!({
5915            "email_text": "ignored",
5916            "messages": [
5917                {"role": "system", "content": "You are a classifier"},
5918                {"role": "user", "content": "Please classify this"}
5919            ]
5920        });
5921
5922        let output = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
5923            .await
5924            .expect("workflow should use chat history from input");
5925
5926        assert_eq!(output.terminal_node, "classify_top_level");
5927        assert_eq!(
5928            output.outputs["classify_top_level"]["output"]["reason"],
5929            Value::String("history".to_string())
5930        );
5931    }
5932
5933    #[tokio::test]
5934    async fn wrapper_entrypoints_produce_equivalent_outputs() {
5935        let yaml = r#"
5936id: wrapper-equivalence
5937entry_node: classify
5938nodes:
5939  - id: classify
5940    node_type:
5941      llm_call:
5942        model: gpt-4.1
5943    config:
5944      prompt: |
5945        Classify this email into exactly one category:
5946        {{ input.email_text }}
5947"#;
5948
5949        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5950        let input = json!({"email_text":"hello"});
5951
5952        let a = run_workflow_yaml(&workflow, &input, &MockExecutor)
5953            .await
5954            .expect("base entrypoint should execute");
5955        let b = run_workflow_yaml_with_custom_worker(&workflow, &input, &MockExecutor, None)
5956            .await
5957            .expect("custom worker wrapper should execute");
5958        let c = run_workflow_yaml_with_custom_worker_and_events_and_options(
5959            &workflow,
5960            &input,
5961            &MockExecutor,
5962            None,
5963            None,
5964            &YamlWorkflowRunOptions::default(),
5965        )
5966        .await
5967        .expect("events/options wrapper should execute");
5968
5969        assert_eq!(a.workflow_id, b.workflow_id);
5970        assert_eq!(a.workflow_id, c.workflow_id);
5971        assert_eq!(a.terminal_node, b.terminal_node);
5972        assert_eq!(a.terminal_node, c.terminal_node);
5973        assert_eq!(a.outputs, b.outputs);
5974        assert_eq!(a.outputs, c.outputs);
5975        assert_eq!(a.total_tokens, b.total_tokens);
5976        assert_eq!(a.total_tokens, c.total_tokens);
5977    }
5978
5979    #[tokio::test]
5980    async fn yaml_llm_tool_calling_captures_traces_and_supports_globals_reference() {
5981        let yaml = r#"
5982id: tool-calling-workflow
5983entry_node: generate_with_tool
5984nodes:
5985  - id: generate_with_tool
5986    node_type:
5987      llm_call:
5988        model: gpt-4.1
5989        tools_format: simplified
5990        max_tool_roundtrips: 1
5991        tool_calls_global_key: audit
5992        tools:
5993          - name: get_customer_context
5994            description: Fetch customer context
5995            input_schema:
5996              type: object
5997              properties:
5998                order_id: { type: string }
5999              required: [order_id]
6000              additionalProperties: false
6001            output_schema:
6002              type: object
6003              properties:
6004                customer_name: { type: string }
6005              required: [customer_name]
6006              additionalProperties: false
6007    config:
6008      output_schema:
6009        type: object
6010        properties:
6011          state: { type: string }
6012        required: [state]
6013  - id: personalize
6014    node_type:
6015      llm_call:
6016        model: gpt-4.1
6017    config:
6018      prompt: |
6019        Write an email greeting for {{ globals.audit.0.output.customer_name }}.
6020      output_schema:
6021        type: object
6022        properties:
6023          subject: { type: string }
6024          body: { type: string }
6025        required: [subject, body]
6026edges:
6027  - from: generate_with_tool
6028    to: personalize
6029"#;
6030
6031        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6032        let client = SimpleAgentsClientBuilder::new()
6033            .with_provider(Arc::new(ToolLoopProvider))
6034            .build()
6035            .expect("client should build");
6036        let worker = FixedToolWorker {
6037            payload: json!({"customer_name": "Ava"}),
6038        };
6039
6040        let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6041            &workflow,
6042            &json!({"email_text":"hello"}),
6043            &client,
6044            Some(&worker),
6045            None,
6046            &YamlWorkflowRunOptions::default(),
6047        )
6048        .await
6049        .expect("workflow should execute");
6050
6051        assert_eq!(output.trace, vec!["generate_with_tool", "personalize"]);
6052        assert_eq!(
6053            output.outputs["generate_with_tool"]["tool_calls"][0]["output"]["customer_name"],
6054            Value::String("Ava".to_string())
6055        );
6056        let body = output.outputs["personalize"]["output"]["body"]
6057            .as_str()
6058            .expect("body should be string");
6059        assert!(body.contains("Ava"));
6060    }
6061
6062    #[tokio::test]
6063    async fn yaml_llm_tool_output_schema_mismatch_hard_fails_node() {
6064        let yaml = r#"
6065id: tool-calling-schema-fail
6066entry_node: generate_with_tool
6067nodes:
6068  - id: generate_with_tool
6069    node_type:
6070      llm_call:
6071        model: gpt-4.1
6072        tools_format: simplified
6073        max_tool_roundtrips: 1
6074        tools:
6075          - name: get_customer_context
6076            input_schema:
6077              type: object
6078              properties:
6079                order_id: { type: string }
6080              required: [order_id]
6081              additionalProperties: false
6082            output_schema:
6083              type: object
6084              properties:
6085                customer_name: { type: string }
6086              required: [customer_name]
6087              additionalProperties: false
6088    config:
6089      output_schema:
6090        type: object
6091        properties:
6092          state: { type: string }
6093        required: [state]
6094"#;
6095
6096        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6097        let client = SimpleAgentsClientBuilder::new()
6098            .with_provider(Arc::new(ToolLoopProvider))
6099            .build()
6100            .expect("client should build");
6101        let worker = FixedToolWorker {
6102            payload: json!({"unexpected": "shape"}),
6103        };
6104
6105        let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6106            &workflow,
6107            &json!({"email_text":"hello"}),
6108            &client,
6109            Some(&worker),
6110            None,
6111            &YamlWorkflowRunOptions::default(),
6112        )
6113        .await
6114        .expect_err("workflow should hard-fail on schema mismatch");
6115
6116        match error {
6117            YamlWorkflowRunError::Llm { message, .. } => {
6118                assert!(message.contains("output failed schema validation"));
6119            }
6120            other => panic!("expected llm error, got {other:?}"),
6121        }
6122    }
6123
6124    #[tokio::test]
6125    async fn yaml_llm_unknown_tool_is_rejected_before_custom_worker_execution() {
6126        let yaml = r#"
6127id: unknown-tool-rejected
6128entry_node: generate_with_tool
6129nodes:
6130  - id: generate_with_tool
6131    node_type:
6132      llm_call:
6133        model: gpt-4.1
6134        tools_format: simplified
6135        max_tool_roundtrips: 1
6136        tools:
6137          - name: get_customer_context
6138            input_schema:
6139              type: object
6140              properties:
6141                order_id: { type: string }
6142              required: [order_id]
6143    config:
6144      output_schema:
6145        type: object
6146        properties:
6147          state: { type: string }
6148        required: [state]
6149"#;
6150
6151        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6152        let client = SimpleAgentsClientBuilder::new()
6153            .with_provider(Arc::new(UnknownToolProvider))
6154            .build()
6155            .expect("client should build");
6156        let worker = CountingToolWorker {
6157            execute_calls: AtomicUsize::new(0),
6158        };
6159
6160        let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6161            &workflow,
6162            &json!({"email_text":"hello"}),
6163            &client,
6164            Some(&worker),
6165            None,
6166            &YamlWorkflowRunOptions::default(),
6167        )
6168        .await
6169        .expect_err("workflow should reject unknown tool before executing worker");
6170
6171        match error {
6172            YamlWorkflowRunError::Llm { message, .. } => {
6173                assert!(message.contains("model requested unknown tool 'unknown_tool'"));
6174            }
6175            other => panic!("expected llm error, got {other:?}"),
6176        }
6177
6178        assert_eq!(worker.execute_calls.load(Ordering::SeqCst), 0);
6179    }
6180
6181    #[test]
6182    fn validates_tools_format_mismatch() {
6183        let yaml = r#"
6184id: mismatch
6185entry_node: generate
6186nodes:
6187  - id: generate
6188    node_type:
6189      llm_call:
6190        model: gpt-4.1
6191        tools_format: openai
6192        tools:
6193          - name: get_customer_context
6194            input_schema:
6195              type: object
6196              properties:
6197                order_id: { type: string }
6198              required: [order_id]
6199"#;
6200
6201        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6202        let diagnostics = verify_yaml_workflow(&workflow);
6203        assert!(diagnostics
6204            .iter()
6205            .any(|diagnostic| diagnostic.code == "invalid_tools_format"));
6206    }
6207
6208    #[test]
6209    fn mock_custom_worker_supports_get_employee_record() {
6210        let result = mock_custom_worker_output(
6211            "get_employee_record",
6212            &json!({"employee_name": "Alex Johnson"}),
6213        )
6214        .expect("mock tool should resolve employee record");
6215
6216        assert_eq!(result["employee_id"], Value::String("EMP-2041".to_string()));
6217        assert_eq!(result["location"], Value::String("Austin".to_string()));
6218    }
6219
6220    #[tokio::test]
6221    async fn custom_worker_receives_trace_context_block() {
6222        let yaml = r#"
6223id: custom-worker-trace-context
6224entry_node: lookup
6225nodes:
6226  - id: lookup
6227    node_type:
6228      custom_worker:
6229        handler: GetRagData
6230    config:
6231      payload:
6232        topic: demo
6233"#;
6234
6235        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6236        let worker = CapturingWorker {
6237            context: Mutex::new(None),
6238        };
6239        let options = YamlWorkflowRunOptions {
6240            trace: YamlWorkflowTraceOptions {
6241                context: Some(YamlWorkflowTraceContextInput {
6242                    trace_id: Some("trace-fixed-ctx".to_string()),
6243                    traceparent: Some("00-trace-fixed-ctx-span-fixed-01".to_string()),
6244                    ..YamlWorkflowTraceContextInput::default()
6245                }),
6246                tenant: YamlWorkflowTraceTenantContext {
6247                    conversation_id: Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5".to_string()),
6248                    request_id: Some("turn-7".to_string()),
6249                    ..YamlWorkflowTraceTenantContext::default()
6250                },
6251            },
6252            ..YamlWorkflowRunOptions::default()
6253        };
6254
6255        run_workflow_yaml_with_custom_worker_and_events_and_options(
6256            &workflow,
6257            &json!({"email_text":"hello"}),
6258            &MockExecutor,
6259            Some(&worker),
6260            None,
6261            &options,
6262        )
6263        .await
6264        .expect("workflow should execute");
6265
6266        let captured_context = worker
6267            .context
6268            .lock()
6269            .expect("capturing worker lock should not be poisoned")
6270            .clone()
6271            .expect("custom worker should receive context");
6272
6273        assert_eq!(
6274            captured_context
6275                .get("trace")
6276                .and_then(|trace| trace.get("context"))
6277                .and_then(|context| context.get("trace_id"))
6278                .and_then(Value::as_str),
6279            Some("trace-fixed-ctx")
6280        );
6281        assert_eq!(
6282            captured_context
6283                .get("trace")
6284                .and_then(|trace| trace.get("context"))
6285                .and_then(|context| context.get("traceparent"))
6286                .and_then(Value::as_str),
6287            Some("00-trace-fixed-ctx-span-fixed-01")
6288        );
6289        assert_eq!(
6290            captured_context
6291                .get("trace")
6292                .and_then(|trace| trace.get("tenant"))
6293                .and_then(|tenant| tenant.get("conversation_id"))
6294                .and_then(Value::as_str),
6295            Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5")
6296        );
6297    }
6298
6299    #[tokio::test]
6300    async fn event_sink_cancellation_stops_workflow_before_llm_execution() {
6301        let yaml = r#"
6302id: cancellation-test
6303entry_node: classify
6304nodes:
6305  - id: classify
6306    node_type:
6307      llm_call:
6308        model: gpt-4.1
6309    config:
6310      prompt: |
6311        Classify this email into exactly one category:
6312        {{ input.email_text }}
6313"#;
6314
6315        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6316        let executor = CountingExecutor {
6317            call_count: AtomicUsize::new(0),
6318        };
6319        let sink = CancelAfterFirstEventSink {
6320            cancelled: AtomicBool::new(false),
6321        };
6322
6323        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
6324            &workflow,
6325            &json!({"email_text":"hello"}),
6326            &executor,
6327            None,
6328            Some(&sink),
6329            &YamlWorkflowRunOptions::default(),
6330        )
6331        .await
6332        .expect_err("workflow should stop when sink signals cancellation");
6333
6334        assert!(matches!(
6335            err,
6336            YamlWorkflowRunError::EventSinkCancelled { .. }
6337        ));
6338        assert_eq!(executor.call_count.load(Ordering::SeqCst), 0);
6339    }
6340
6341    #[tokio::test]
6342    async fn rejects_invalid_messages_path_shape() {
6343        let yaml = r#"
6344id: email-intake-classification
6345entry_node: classify_top_level
6346nodes:
6347  - id: classify_top_level
6348    node_type:
6349      llm_call:
6350        model: gpt-4.1
6351        messages_path: input.messages
6352"#;
6353
6354        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6355        let input = json!({
6356            "email_text": "ignored",
6357            "messages": "not-a-list"
6358        });
6359
6360        let err = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
6361            .await
6362            .expect_err("workflow should fail for invalid messages shape");
6363
6364        assert!(matches!(err, YamlWorkflowRunError::Llm { .. }));
6365    }
6366
6367    #[test]
6368    fn renders_yaml_workflow_to_mermaid_with_switch_labels() {
6369        let yaml = r#"
6370id: chat-workflow
6371entry_node: decide
6372nodes:
6373  - id: decide
6374    node_type:
6375      switch:
6376        branches:
6377          - condition: '$.input.mode == "draft"'
6378            target: draft
6379        default: ask
6380  - id: draft
6381    node_type:
6382      llm_call:
6383        model: gpt-4.1
6384  - id: ask
6385    node_type:
6386      llm_call:
6387        model: gpt-4.1
6388edges:
6389  - from: draft
6390    to: ask
6391"#;
6392
6393        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6394        let mermaid = yaml_workflow_to_mermaid(&workflow);
6395
6396        assert!(mermaid.contains("flowchart TD"));
6397        assert!(mermaid.contains("decide -- \"route1\" --> draft"));
6398        assert!(mermaid.contains("decide -- \"default\" --> ask"));
6399        assert!(mermaid.contains("draft --> ask"));
6400    }
6401
6402    #[test]
6403    fn renders_yaml_workflow_tools_as_colored_tool_nodes() {
6404        let yaml = r#"
6405id: tool-graph
6406entry_node: chat
6407nodes:
6408  - id: chat
6409    node_type:
6410      llm_call:
6411        model: gemini-3-flash
6412        tools_format: simplified
6413        tools:
6414          - name: run_workflow_graph
6415            input_schema:
6416              type: object
6417              properties:
6418                workflow_id: { type: string }
6419              required: [workflow_id]
6420edges: []
6421"#;
6422
6423        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6424        let mermaid = yaml_workflow_to_mermaid(&workflow);
6425
6426        assert!(mermaid.contains("chat__tool_0"));
6427        assert!(mermaid.contains("chat -.-> chat__tool_0"));
6428        assert!(mermaid.contains("classDef toolNode"));
6429        assert!(mermaid.contains("class chat__tool_0 toolNode;"));
6430    }
6431
6432    #[test]
6433    fn renders_yaml_workflow_file_to_mermaid_with_subgraph_cluster_when_present() {
6434        let base_dir = std::env::temp_dir().join(format!(
6435            "simple_agents_mermaid_subgraph_{}",
6436            std::time::SystemTime::now()
6437                .duration_since(std::time::UNIX_EPOCH)
6438                .expect("unix epoch")
6439                .as_nanos()
6440        ));
6441        fs::create_dir_all(&base_dir).expect("temp dir should be created");
6442
6443        let orchestrator_path = base_dir.join("email-chat-orchestrator-with-subgraph-tool.yaml");
6444        let subgraph_path = base_dir.join("hr-warning-email-subgraph.yaml");
6445
6446        let orchestrator_yaml = r#"
6447id: email-chat-orchestrator-with-subgraph-tool
6448entry_node: respond_casual
6449nodes:
6450  - id: respond_casual
6451    node_type:
6452      llm_call:
6453        model: gemini-3-flash
6454        tools_format: simplified
6455        tools:
6456          - name: run_workflow_graph
6457            input_schema:
6458              type: object
6459              properties:
6460                workflow_id: { type: string }
6461              required: [workflow_id]
6462    config:
6463      prompt: |
6464        Call with:
6465        {
6466          "workflow_id": "hr_warning_email_subgraph"
6467        }
6468edges: []
6469"#;
6470
6471        let subgraph_yaml = r#"
6472id: hr-warning-email-subgraph
6473entry_node: draft_hr_warning_email
6474nodes:
6475  - id: draft_hr_warning_email
6476    node_type:
6477      llm_call:
6478        model: gemini-3-flash
6479edges: []
6480"#;
6481
6482        fs::write(&orchestrator_path, orchestrator_yaml).expect("orchestrator yaml written");
6483        fs::write(&subgraph_path, subgraph_yaml).expect("subgraph yaml written");
6484
6485        let mermaid =
6486            yaml_workflow_file_to_mermaid(&orchestrator_path).expect("mermaid should render");
6487
6488        assert!(mermaid.contains("Main: email-chat-orchestrator-with-subgraph-tool"));
6489        assert!(mermaid.contains("Subgraph: hr_warning_email_subgraph"));
6490        assert!(mermaid.contains("calls hr_warning_email_subgraph"));
6491        assert!(mermaid.contains("subgraph_1__draft_hr_warning_email"));
6492
6493        fs::remove_dir_all(base_dir).expect("temp dir removed");
6494    }
6495
6496    #[test]
6497    fn converts_yaml_workflow_to_ir_definition() {
6498        let yaml = r#"
6499id: chat-workflow
6500entry_node: classify
6501nodes:
6502  - id: classify
6503    node_type:
6504      llm_call:
6505        model: gpt-4.1
6506    config:
6507      prompt: |
6508        classify
6509  - id: route
6510    node_type:
6511      switch:
6512        branches:
6513          - condition: '$.nodes.classify.output.kind == "x"'
6514            target: done
6515        default: done
6516  - id: done
6517    node_type:
6518      custom_worker:
6519        handler: GetRagData
6520    config:
6521      payload:
6522        topic: test
6523edges:
6524  - from: classify
6525    to: route
6526"#;
6527
6528        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6529        let ir = yaml_workflow_to_ir(&workflow).expect("yaml should convert to ir");
6530
6531        assert_eq!(ir.name, "chat-workflow");
6532        assert!(ir.nodes.iter().any(|n| n.id == "__yaml_start"));
6533        assert!(ir.nodes.iter().any(|n| n.id == "classify"));
6534        assert!(ir.nodes.iter().any(|n| n.id == "route"));
6535        assert!(ir.nodes.iter().any(|n| n.id == "done"));
6536    }
6537
6538    #[test]
6539    fn supports_yaml_to_ir_when_messages_path_is_used() {
6540        let yaml = r#"
6541id: chat-workflow
6542entry_node: classify
6543nodes:
6544  - id: classify
6545    node_type:
6546      llm_call:
6547        model: gpt-4.1
6548        messages_path: input.messages
6549"#;
6550
6551        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6552        let ir =
6553            yaml_workflow_to_ir(&workflow).expect("messages_path should convert to tool-based IR");
6554        assert!(ir.nodes.iter().any(|node| matches!(
6555            node.kind,
6556            crate::ir::NodeKind::Tool { ref tool, .. } if tool == "__yaml_llm_call"
6557        )));
6558    }
6559
6560    #[tokio::test]
6561    async fn workflow_output_contains_trace_id_in_both_locations() {
6562        let yaml = r#"
6563id: trace-test
6564entry_node: classify
6565nodes:
6566  - id: classify
6567    node_type:
6568      llm_call:
6569        model: gpt-4.1
6570    config:
6571      prompt: |
6572        Classify this email into exactly one category:
6573        {{ input.email_text }}
6574"#;
6575
6576        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6577        let output = run_workflow_yaml(&workflow, &json!({"email_text":"hello"}), &MockExecutor)
6578            .await
6579            .expect("workflow should execute");
6580
6581        let trace_id = output
6582            .trace_id
6583            .as_deref()
6584            .expect("trace_id should be present");
6585        assert!(!trace_id.is_empty());
6586        assert_eq!(
6587            output.metadata.as_ref().and_then(|value| {
6588                value
6589                    .get("telemetry")
6590                    .and_then(|telemetry| telemetry.get("trace_id"))
6591                    .and_then(Value::as_str)
6592            }),
6593            Some(trace_id)
6594        );
6595    }
6596
6597    #[tokio::test]
6598    async fn workflow_run_options_sample_rate_zero_marks_trace_unsampled() {
6599        let yaml = r#"
6600id: sample-rate-zero
6601entry_node: classify
6602nodes:
6603  - id: classify
6604    node_type:
6605      llm_call:
6606        model: gpt-4.1
6607    config:
6608      prompt: |
6609        Classify this email into exactly one category:
6610        {{ input.email_text }}
6611"#;
6612
6613        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6614        let options = YamlWorkflowRunOptions {
6615            telemetry: YamlWorkflowTelemetryConfig {
6616                sample_rate: 0.0,
6617                ..YamlWorkflowTelemetryConfig::default()
6618            },
6619            trace: YamlWorkflowTraceOptions {
6620                context: Some(YamlWorkflowTraceContextInput {
6621                    trace_id: Some("trace-sample-zero".to_string()),
6622                    ..YamlWorkflowTraceContextInput::default()
6623                }),
6624                tenant: YamlWorkflowTraceTenantContext::default(),
6625            },
6626            model: None,
6627        };
6628
6629        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6630            &workflow,
6631            &json!({"email_text":"hello"}),
6632            &MockExecutor,
6633            None,
6634            None,
6635            &options,
6636        )
6637        .await
6638        .expect("workflow should execute");
6639
6640        assert_eq!(output.trace_id.as_deref(), Some("trace-sample-zero"));
6641        assert_eq!(
6642            output
6643                .metadata
6644                .as_ref()
6645                .and_then(|value| value.get("telemetry"))
6646                .and_then(|telemetry| telemetry.get("sampled"))
6647                .and_then(Value::as_bool),
6648            Some(false)
6649        );
6650    }
6651
6652    #[test]
6653    fn trace_id_from_traceparent_parses_w3c_header() {
6654        let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
6655        assert_eq!(
6656            trace_id_from_traceparent(traceparent).as_deref(),
6657            Some("4bf92f3577b34da6a3ce929d0e0e4736")
6658        );
6659    }
6660
6661    #[test]
6662    fn resolve_telemetry_context_marks_traceparent_source() {
6663        let mut options = YamlWorkflowRunOptions::default();
6664        options.trace.context = Some(YamlWorkflowTraceContextInput {
6665            traceparent: Some(
6666                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
6667            ),
6668            ..YamlWorkflowTraceContextInput::default()
6669        });
6670
6671        let context = resolve_telemetry_context(&options, None);
6672
6673        assert_eq!(context.trace_id_source, TraceIdSource::Traceparent);
6674        assert_eq!(
6675            context.trace_id.as_deref(),
6676            Some("4bf92f3577b34da6a3ce929d0e0e4736")
6677        );
6678    }
6679
6680    #[tokio::test]
6681    async fn workflow_run_options_derives_trace_id_from_traceparent_when_trace_id_missing() {
6682        let yaml = r#"
6683id: traceparent-derived
6684entry_node: classify
6685nodes:
6686  - id: classify
6687    node_type:
6688      llm_call:
6689        model: gpt-4.1
6690    config:
6691      prompt: |
6692        Classify this email into exactly one category:
6693        {{ input.email_text }}
6694"#;
6695
6696        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6697        let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
6698        let options = YamlWorkflowRunOptions {
6699            telemetry: YamlWorkflowTelemetryConfig {
6700                sample_rate: 0.0,
6701                ..YamlWorkflowTelemetryConfig::default()
6702            },
6703            trace: YamlWorkflowTraceOptions {
6704                context: Some(YamlWorkflowTraceContextInput {
6705                    traceparent: Some(traceparent.to_string()),
6706                    ..YamlWorkflowTraceContextInput::default()
6707                }),
6708                tenant: YamlWorkflowTraceTenantContext::default(),
6709            },
6710            model: None,
6711        };
6712
6713        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6714            &workflow,
6715            &json!({"email_text":"hello"}),
6716            &MockExecutor,
6717            None,
6718            None,
6719            &options,
6720        )
6721        .await
6722        .expect("workflow should execute");
6723
6724        assert_eq!(
6725            output.trace_id.as_deref(),
6726            Some("4bf92f3577b34da6a3ce929d0e0e4736")
6727        );
6728        assert_eq!(
6729            output
6730                .metadata
6731                .as_ref()
6732                .and_then(|value| value.get("telemetry"))
6733                .and_then(|telemetry| telemetry.get("sampled"))
6734                .and_then(Value::as_bool),
6735            Some(false)
6736        );
6737    }
6738
6739    #[tokio::test]
6740    async fn workflow_run_options_sample_rate_one_marks_trace_sampled() {
6741        let yaml = r#"
6742id: sample-rate-one
6743entry_node: classify
6744nodes:
6745  - id: classify
6746    node_type:
6747      llm_call:
6748        model: gpt-4.1
6749    config:
6750      prompt: |
6751        Classify this email into exactly one category:
6752        {{ input.email_text }}
6753"#;
6754
6755        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6756        let options = YamlWorkflowRunOptions {
6757            telemetry: YamlWorkflowTelemetryConfig {
6758                sample_rate: 1.0,
6759                ..YamlWorkflowTelemetryConfig::default()
6760            },
6761            trace: YamlWorkflowTraceOptions {
6762                context: Some(YamlWorkflowTraceContextInput {
6763                    trace_id: Some("trace-sample-one".to_string()),
6764                    ..YamlWorkflowTraceContextInput::default()
6765                }),
6766                tenant: YamlWorkflowTraceTenantContext::default(),
6767            },
6768            model: None,
6769        };
6770
6771        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6772            &workflow,
6773            &json!({"email_text":"hello"}),
6774            &MockExecutor,
6775            None,
6776            None,
6777            &options,
6778        )
6779        .await
6780        .expect("workflow should execute");
6781
6782        assert_eq!(output.trace_id.as_deref(), Some("trace-sample-one"));
6783        assert_eq!(
6784            output
6785                .metadata
6786                .as_ref()
6787                .and_then(|value| value.get("telemetry"))
6788                .and_then(|telemetry| telemetry.get("sampled"))
6789                .and_then(Value::as_bool),
6790            Some(true)
6791        );
6792    }
6793
6794    #[tokio::test]
6795    async fn workflow_run_options_sampling_is_deterministic_for_fixed_trace_id() {
6796        let yaml = r#"
6797id: sample-rate-deterministic
6798entry_node: classify
6799nodes:
6800  - id: classify
6801    node_type:
6802      llm_call:
6803        model: gpt-4.1
6804    config:
6805      prompt: |
6806        Classify this email into exactly one category:
6807        {{ input.email_text }}
6808"#;
6809
6810        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6811        let options = YamlWorkflowRunOptions {
6812            telemetry: YamlWorkflowTelemetryConfig {
6813                sample_rate: 0.5,
6814                ..YamlWorkflowTelemetryConfig::default()
6815            },
6816            trace: YamlWorkflowTraceOptions {
6817                context: Some(YamlWorkflowTraceContextInput {
6818                    trace_id: Some("trace-sample-deterministic".to_string()),
6819                    ..YamlWorkflowTraceContextInput::default()
6820                }),
6821                tenant: YamlWorkflowTraceTenantContext::default(),
6822            },
6823            model: None,
6824        };
6825
6826        let mut sampled_values = Vec::new();
6827        for _ in 0..3 {
6828            let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6829                &workflow,
6830                &json!({"email_text":"hello"}),
6831                &MockExecutor,
6832                None,
6833                None,
6834                &options,
6835            )
6836            .await
6837            .expect("workflow should execute");
6838
6839            let sampled = output
6840                .metadata
6841                .as_ref()
6842                .and_then(|value| value.get("telemetry"))
6843                .and_then(|telemetry| telemetry.get("sampled"))
6844                .and_then(Value::as_bool)
6845                .expect("sampled flag should be present");
6846            sampled_values.push(sampled);
6847        }
6848
6849        assert!(sampled_values
6850            .iter()
6851            .all(|value| *value == sampled_values[0]));
6852    }
6853
6854    #[tokio::test]
6855    async fn workflow_run_options_reject_invalid_sample_rate() {
6856        let yaml = r#"
6857id: sample-rate-invalid
6858entry_node: classify
6859nodes:
6860  - id: classify
6861    node_type:
6862      llm_call:
6863        model: gpt-4.1
6864    config:
6865      prompt: |
6866        Classify this email into exactly one category:
6867        {{ input.email_text }}
6868"#;
6869
6870        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6871        let options = YamlWorkflowRunOptions {
6872            telemetry: YamlWorkflowTelemetryConfig {
6873                sample_rate: 1.1,
6874                ..YamlWorkflowTelemetryConfig::default()
6875            },
6876            ..YamlWorkflowRunOptions::default()
6877        };
6878
6879        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
6880            &workflow,
6881            &json!({"email_text":"hello"}),
6882            &MockExecutor,
6883            None,
6884            None,
6885            &options,
6886        )
6887        .await
6888        .expect_err("invalid sample_rate should fail");
6889
6890        match err {
6891            YamlWorkflowRunError::InvalidInput { message } => {
6892                assert!(message.contains("telemetry.sample_rate"));
6893            }
6894            _ => panic!("expected invalid input error for sample_rate"),
6895        }
6896    }
6897
6898    #[tokio::test]
6899    async fn workflow_run_options_reject_nan_sample_rate() {
6900        let yaml = r#"
6901id: sample-rate-invalid
6902entry_node: classify
6903nodes:
6904  - id: classify
6905    node_type:
6906      llm_call:
6907        model: gpt-4.1
6908    config:
6909      prompt: |
6910        Classify this email into exactly one category:
6911        {{ input.email_text }}
6912"#;
6913
6914        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6915        let options = YamlWorkflowRunOptions {
6916            telemetry: YamlWorkflowTelemetryConfig {
6917                sample_rate: f32::NAN,
6918                ..YamlWorkflowTelemetryConfig::default()
6919            },
6920            ..YamlWorkflowRunOptions::default()
6921        };
6922
6923        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
6924            &workflow,
6925            &json!({"email_text":"hello"}),
6926            &MockExecutor,
6927            None,
6928            None,
6929            &options,
6930        )
6931        .await
6932        .expect_err("nan sample_rate should fail");
6933
6934        assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
6935    }
6936
6937    #[test]
6938    fn subworkflow_options_inherit_parent_telemetry_configuration() {
6939        let mut parent_options = YamlWorkflowRunOptions::default();
6940        parent_options.telemetry.sample_rate = 0.0;
6941        parent_options.telemetry.payload_mode = YamlWorkflowPayloadMode::RedactedPayload;
6942        parent_options.telemetry.tool_trace_mode = YamlToolTraceMode::Redacted;
6943        parent_options.trace.tenant.conversation_id = Some("conv-123".to_string());
6944
6945        let parent_context = TraceContext {
6946            trace_id: Some("trace-parent".to_string()),
6947            span_id: Some("span-parent".to_string()),
6948            parent_span_id: None,
6949            traceparent: Some("00-trace-parent-span-parent-01".to_string()),
6950            tracestate: None,
6951            baggage: BTreeMap::new(),
6952        };
6953
6954        let subworkflow_options =
6955            build_subworkflow_options(&parent_options, Some(&parent_context), None);
6956
6957        assert_eq!(subworkflow_options.telemetry.sample_rate, 0.0);
6958        assert_eq!(
6959            subworkflow_options.telemetry.payload_mode,
6960            YamlWorkflowPayloadMode::RedactedPayload
6961        );
6962        assert_eq!(
6963            subworkflow_options.telemetry.tool_trace_mode,
6964            YamlToolTraceMode::Redacted
6965        );
6966        assert_eq!(
6967            subworkflow_options.trace.tenant.conversation_id.as_deref(),
6968            Some("conv-123")
6969        );
6970        assert_eq!(
6971            subworkflow_options
6972                .trace
6973                .context
6974                .as_ref()
6975                .and_then(|ctx| ctx.trace_id.as_deref()),
6976            Some("trace-parent")
6977        );
6978    }
6979
6980    #[tokio::test]
6981    async fn workflow_run_options_use_explicit_trace_id_and_payload_mode() {
6982        let yaml = r#"
6983id: trace-options-test
6984entry_node: classify
6985nodes:
6986  - id: classify
6987    node_type:
6988      llm_call:
6989        model: gpt-4.1
6990    config:
6991      prompt: |
6992        Classify this email into exactly one category:
6993        {{ input.email_text }}
6994"#;
6995
6996        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6997        let options = YamlWorkflowRunOptions {
6998            telemetry: YamlWorkflowTelemetryConfig {
6999                payload_mode: YamlWorkflowPayloadMode::RedactedPayload,
7000                ..YamlWorkflowTelemetryConfig::default()
7001            },
7002            trace: YamlWorkflowTraceOptions {
7003                context: Some(YamlWorkflowTraceContextInput {
7004                    trace_id: Some("trace-fixed-123".to_string()),
7005                    traceparent: Some("00-trace-fixed-123-span-1-01".to_string()),
7006                    ..YamlWorkflowTraceContextInput::default()
7007                }),
7008                tenant: YamlWorkflowTraceTenantContext {
7009                    conversation_id: Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42".to_string()),
7010                    ..YamlWorkflowTraceTenantContext::default()
7011                },
7012            },
7013            model: None,
7014        };
7015
7016        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7017            &workflow,
7018            &json!({"email_text":"hello"}),
7019            &MockExecutor,
7020            None,
7021            None,
7022            &options,
7023        )
7024        .await
7025        .expect("workflow should execute");
7026
7027        assert_eq!(output.trace_id.as_deref(), Some("trace-fixed-123"));
7028        assert_eq!(
7029            output
7030                .metadata
7031                .as_ref()
7032                .and_then(|value| value.get("telemetry"))
7033                .and_then(|telemetry| telemetry.get("payload_mode"))
7034                .and_then(Value::as_str),
7035            Some("redacted_payload")
7036        );
7037        assert_eq!(
7038            output
7039                .metadata
7040                .as_ref()
7041                .and_then(|value| value.get("trace"))
7042                .and_then(|trace| trace.get("tenant"))
7043                .and_then(|tenant| tenant.get("conversation_id"))
7044                .and_then(Value::as_str),
7045            Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42")
7046        );
7047    }
7048
7049    #[test]
7050    fn streamed_payload_parser_extracts_last_json_object() {
7051        let raw = r#"{"state":"missing_scenario","reason":"ok"}
7052
7053extra reasoning text
7054
7055{"state":"ready","reason":"final"}"#;
7056
7057        let resolved = parse_streamed_structured_payload(raw, false)
7058            .expect("parser should extract final JSON object");
7059        assert_eq!(resolved.payload["state"], "ready");
7060        assert!(resolved.heal_confidence.is_none());
7061    }
7062
7063    #[test]
7064    fn streamed_payload_parser_handles_unbalanced_reasoning_before_json() {
7065        let raw = "reasoning text with unmatched { braces and thoughts\n{\"state\":\"ready\",\"reason\":\"final\"}";
7066
7067        let resolved = parse_streamed_structured_payload(raw, false)
7068            .expect("parser should recover final structured JSON object");
7069        assert_eq!(resolved.payload["state"], "ready");
7070    }
7071
7072    #[test]
7073    fn streamed_payload_parser_handles_markdown_with_heal() {
7074        let raw = r#"Some preface
7075```json
7076{
7077  "state": "missing_scenario",
7078  "reason": "Need more details"
7079}
7080```
7081Some trailing explanation"#;
7082
7083        let resolved = parse_streamed_structured_payload(raw, true)
7084            .expect("heal path should parse JSON block");
7085        assert_eq!(resolved.payload["state"], "missing_scenario");
7086        assert!(resolved.heal_confidence.is_some());
7087    }
7088
7089    #[test]
7090    fn streamed_payload_parser_errors_when_no_json_candidate_exists() {
7091        let raw = "No JSON in this streamed output";
7092        let error = parse_streamed_structured_payload(raw, false)
7093            .expect_err("strict stream parse should fail without JSON candidate");
7094        assert!(error.contains("no JSON object candidate found"));
7095    }
7096
7097    #[test]
7098    fn include_raw_stream_debug_events_defaults_to_false() {
7099        let _guard = stream_debug_env_lock()
7100            .lock()
7101            .expect("stream debug env lock should not be poisoned");
7102        std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
7103        assert!(!include_raw_stream_debug_events());
7104    }
7105
7106    #[test]
7107    fn include_raw_stream_debug_events_accepts_truthy_values() {
7108        let _guard = stream_debug_env_lock()
7109            .lock()
7110            .expect("stream debug env lock should not be poisoned");
7111        std::env::set_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW", "true");
7112        assert!(include_raw_stream_debug_events());
7113        std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
7114    }
7115
7116    #[test]
7117    fn structured_json_delta_filter_strips_reasoning_prefix_and_suffix() {
7118        let mut filter = StructuredJsonDeltaFilter::default();
7119        let chunks = vec![
7120            "I will think first... ",
7121            "{\"state\":\"missing_scenario\",",
7122            "\"reason\":\"Need more details\"}",
7123            " additional commentary",
7124        ];
7125
7126        let filtered = chunks
7127            .into_iter()
7128            .filter_map(|chunk| filter.split(chunk).0)
7129            .collect::<String>();
7130
7131        assert_eq!(
7132            filtered,
7133            "{\"state\":\"missing_scenario\",\"reason\":\"Need more details\"}"
7134        );
7135    }
7136
7137    #[test]
7138    fn structured_json_delta_filter_handles_braces_inside_strings() {
7139        let mut filter = StructuredJsonDeltaFilter::default();
7140        let chunks = vec![
7141            "preface ",
7142            "{\"reason\":\"brace } in text\",\"state\":\"ok\"}",
7143            " trailing",
7144        ];
7145
7146        let filtered = chunks
7147            .into_iter()
7148            .filter_map(|chunk| filter.split(chunk).0)
7149            .collect::<String>();
7150
7151        assert_eq!(
7152            filtered,
7153            "{\"reason\":\"brace } in text\",\"state\":\"ok\"}"
7154        );
7155    }
7156
7157    #[test]
7158    fn render_json_object_as_text_converts_top_level_fields() {
7159        let rendered =
7160            render_json_object_as_text(r#"{"question":"q","confidence":0.8,"nested":{"a":1}}"#);
7161        let lines: std::collections::HashSet<&str> = rendered.lines().collect();
7162
7163        assert_eq!(lines.len(), 3);
7164        assert!(lines.contains("question: q"));
7165        assert!(lines.contains("confidence: 0.8"));
7166        assert!(lines.contains("nested: {\"a\":1}"));
7167    }
7168
7169    #[test]
7170    fn stream_json_as_text_formatter_emits_once_when_complete() {
7171        let mut formatter = StreamJsonAsTextFormatter::default();
7172        formatter.push("{\"question\":\"hello\"}");
7173
7174        let first = formatter.emit_if_ready(true);
7175        let second = formatter.emit_if_ready(true);
7176
7177        assert_eq!(first, Some("question: hello".to_string()));
7178        assert_eq!(second, None);
7179    }
7180
7181    #[test]
7182    fn rewrite_yaml_condition_preserves_output_prefix_in_field_names() {
7183        let expr = "$.nodes.classify.output.output_total == 1";
7184        let rewritten = rewrite_yaml_condition_to_ir(expr);
7185        assert_eq!(rewritten, "$.node_outputs.classify.output_total == 1");
7186    }
7187
7188    #[tokio::test]
7189    async fn validates_workflow_input_before_ir_runtime_path() {
7190        let yaml = r#"
7191id: chat-workflow
7192entry_node: classify
7193nodes:
7194  - id: classify
7195    node_type:
7196      llm_call:
7197        model: gpt-4.1
7198    config:
7199      prompt: |
7200        classify
7201"#;
7202
7203        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7204        let err = run_workflow_yaml(&workflow, &json!("not-an-object"), &MockExecutor)
7205            .await
7206            .expect_err("non-object input should fail before execution");
7207
7208        assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
7209    }
7210
7211    #[test]
7212    fn interpolate_template_supports_dollar_prefixed_paths() {
7213        let context = json!({
7214            "input": {
7215                "email_text": "hello"
7216            }
7217        });
7218
7219        let rendered = interpolate_template("value={{ $.input.email_text }}", &context);
7220        assert_eq!(rendered, "value=hello");
7221    }
7222}