Skip to main content

simple_agents_workflow/
yaml_runner.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::Path;
3use std::time::Instant;
4
5use async_trait::async_trait;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use simple_agent_type::message::{Message, Role};
10use simple_agent_type::request::CompletionRequest;
11use simple_agent_type::response::FinishReason;
12use simple_agent_type::tool::{
13    ToolCall, ToolChoice, ToolChoiceFunction, ToolChoiceMode, ToolChoiceTool, ToolDefinition,
14    ToolFunction, ToolType,
15};
16use simple_agents_core::{
17    CompletionMode, CompletionOptions, CompletionOutcome, SimpleAgentsClient,
18};
19use simple_agents_healing::JsonishParser;
20use thiserror::Error;
21
22use crate::ir::{Node, NodeKind, RouterRoute, WorkflowDefinition, WORKFLOW_IR_V0};
23use crate::observability::tracing::{workflow_tracer, SpanKind, TraceContext, WorkflowSpan};
24use crate::runtime::{
25    LlmExecutionError, LlmExecutionInput, LlmExecutionOutput, LlmExecutor, ToolExecutionError,
26    ToolExecutionInput, ToolExecutor, WorkflowRuntime, WorkflowRuntimeError,
27    WorkflowRuntimeOptions,
28};
29use crate::visualize::workflow_to_mermaid;
30
31const YAML_START_NODE_ID: &str = "__yaml_start";
32const YAML_LLM_TOOL_ID: &str = "__yaml_llm_call";
33
34static TRACE_ID_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
35
36#[derive(Debug, Clone, PartialEq, Serialize)]
37pub struct YamlStepTiming {
38    pub node_id: String,
39    pub node_kind: String,
40    pub elapsed_ms: u128,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub prompt_tokens: Option<u32>,
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub completion_tokens: Option<u32>,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub total_tokens: Option<u32>,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub thinking_tokens: Option<u32>,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub tokens_per_second: Option<f64>,
51}
52
53#[derive(Debug, Clone, PartialEq, Serialize)]
54pub struct YamlLlmNodeMetrics {
55    pub elapsed_ms: u128,
56    pub prompt_tokens: u32,
57    pub completion_tokens: u32,
58    pub total_tokens: u32,
59    #[serde(skip_serializing_if = "Option::is_none")]
60    pub thinking_tokens: Option<u32>,
61    pub tokens_per_second: f64,
62}
63
64#[derive(Debug, Clone, PartialEq, Serialize)]
65pub struct YamlWorkflowRunOutput {
66    pub workflow_id: String,
67    pub entry_node: String,
68    pub email_text: String,
69    pub trace: Vec<String>,
70    pub outputs: BTreeMap<String, Value>,
71    pub terminal_node: String,
72    pub terminal_output: Option<Value>,
73    pub step_timings: Vec<YamlStepTiming>,
74    pub llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics>,
75    pub total_elapsed_ms: u128,
76    #[serde(skip_serializing_if = "Option::is_none")]
77    pub ttft_ms: Option<u128>,
78    pub total_input_tokens: u64,
79    pub total_output_tokens: u64,
80    pub total_tokens: u64,
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub total_thinking_tokens: Option<u64>,
83    pub tokens_per_second: f64,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub trace_id: Option<String>,
86    #[serde(skip_serializing_if = "Option::is_none")]
87    pub metadata: Option<Value>,
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
91#[serde(rename_all = "snake_case")]
92pub enum YamlWorkflowPayloadMode {
93    #[default]
94    FullPayload,
95    RedactedPayload,
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
99#[serde(rename_all = "snake_case")]
100pub enum YamlToolTraceMode {
101    #[default]
102    Full,
103    Redacted,
104    Off,
105}
106
107#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
108pub struct YamlWorkflowTraceContextInput {
109    #[serde(default)]
110    pub trace_id: Option<String>,
111    #[serde(default)]
112    pub span_id: Option<String>,
113    #[serde(default)]
114    pub parent_span_id: Option<String>,
115    #[serde(default)]
116    pub traceparent: Option<String>,
117    #[serde(default)]
118    pub tracestate: Option<String>,
119    #[serde(default)]
120    pub baggage: BTreeMap<String, String>,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
124pub struct YamlWorkflowTraceTenantContext {
125    #[serde(default)]
126    pub workspace_id: Option<String>,
127    #[serde(default)]
128    pub user_id: Option<String>,
129    #[serde(default)]
130    pub conversation_id: Option<String>,
131    #[serde(default)]
132    pub request_id: Option<String>,
133    #[serde(default)]
134    pub run_id: Option<String>,
135}
136
137#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
138pub struct YamlWorkflowTelemetryConfig {
139    #[serde(default = "default_true")]
140    pub enabled: bool,
141    #[serde(default = "default_true")]
142    pub nerdstats: bool,
143    #[serde(default = "default_sample_rate")]
144    pub sample_rate: f32,
145    #[serde(default)]
146    pub payload_mode: YamlWorkflowPayloadMode,
147    #[serde(default = "default_retention_days")]
148    pub retention_days: u32,
149    #[serde(default = "default_true")]
150    pub multi_tenant: bool,
151    #[serde(default)]
152    pub tool_trace_mode: YamlToolTraceMode,
153}
154
155impl Default for YamlWorkflowTelemetryConfig {
156    fn default() -> Self {
157        Self {
158            enabled: true,
159            nerdstats: true,
160            sample_rate: 1.0,
161            payload_mode: YamlWorkflowPayloadMode::FullPayload,
162            retention_days: 30,
163            multi_tenant: true,
164            tool_trace_mode: YamlToolTraceMode::Full,
165        }
166    }
167}
168
169#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
170pub struct YamlWorkflowTraceOptions {
171    #[serde(default)]
172    pub context: Option<YamlWorkflowTraceContextInput>,
173    #[serde(default)]
174    pub tenant: YamlWorkflowTraceTenantContext,
175}
176
177#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
178pub struct YamlWorkflowRunOptions {
179    #[serde(default)]
180    pub telemetry: YamlWorkflowTelemetryConfig,
181    #[serde(default)]
182    pub trace: YamlWorkflowTraceOptions,
183}
184
185#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
186pub struct YamlLlmTokenUsage {
187    pub prompt_tokens: u32,
188    pub completion_tokens: u32,
189    pub total_tokens: u32,
190    pub thinking_tokens: Option<u32>,
191}
192
193#[derive(Debug, Clone, PartialEq, Serialize)]
194pub struct YamlLlmExecutionResult {
195    pub payload: Value,
196    pub usage: Option<YamlLlmTokenUsage>,
197    pub ttft_ms: Option<u128>,
198    pub tool_calls: Vec<YamlToolCallTrace>,
199}
200
201#[derive(Debug, Clone, PartialEq, Serialize)]
202pub struct YamlToolCallTrace {
203    pub id: String,
204    pub name: String,
205    pub arguments: Value,
206    pub output: Option<Value>,
207    pub status: String,
208    pub elapsed_ms: u128,
209    pub error: Option<String>,
210}
211
212#[derive(Debug, Clone, Default)]
213struct YamlTokenTotals {
214    input_tokens: u64,
215    output_tokens: u64,
216    total_tokens: u64,
217    thinking_tokens: Option<u64>,
218}
219
220impl YamlTokenTotals {
221    fn add_usage(&mut self, usage: &YamlLlmTokenUsage) {
222        self.input_tokens += u64::from(usage.prompt_tokens);
223        self.output_tokens += u64::from(usage.completion_tokens);
224        self.total_tokens += u64::from(usage.total_tokens);
225
226        if let Some(thinking_tokens) = usage.thinking_tokens {
227            let next = self.thinking_tokens.unwrap_or(0) + u64::from(thinking_tokens);
228            self.thinking_tokens = Some(next);
229        }
230    }
231
232    fn tokens_per_second(&self, elapsed_ms: u128) -> f64 {
233        if elapsed_ms == 0 {
234            return 0.0;
235        }
236        round_two_decimals((self.output_tokens as f64) * 1000.0 / (elapsed_ms as f64))
237    }
238}
239
240fn round_two_decimals(value: f64) -> f64 {
241    (value * 100.0).round() / 100.0
242}
243
244fn completion_tokens_per_second(completion_tokens: u32, elapsed_ms: u128) -> f64 {
245    if elapsed_ms == 0 {
246        return 0.0;
247    }
248    round_two_decimals((completion_tokens as f64) * 1000.0 / (elapsed_ms as f64))
249}
250
251fn default_true() -> bool {
252    true
253}
254
255fn default_sample_rate() -> f32 {
256    1.0
257}
258
259fn default_retention_days() -> u32 {
260    30
261}
262
263fn generate_trace_id() -> String {
264    use std::time::{SystemTime, UNIX_EPOCH};
265
266    let now_nanos = SystemTime::now()
267        .duration_since(UNIX_EPOCH)
268        .map(|duration| duration.as_nanos())
269        .unwrap_or(0);
270    let sequence = u128::from(TRACE_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed));
271    format!("{:032x}", now_nanos ^ sequence)
272}
273
274fn resolve_trace_id(options: &YamlWorkflowRunOptions, span_context: &TraceContext) -> String {
275    options
276        .trace
277        .context
278        .as_ref()
279        .and_then(|context| context.trace_id.clone())
280        .or_else(|| span_context.trace_id.clone())
281        .unwrap_or_else(generate_trace_id)
282}
283
284fn workflow_metadata_with_trace(options: &YamlWorkflowRunOptions, trace_id: &str) -> Value {
285    json!({
286        "telemetry": {
287            "trace_id": trace_id,
288            "enabled": options.telemetry.enabled,
289            "nerdstats": options.telemetry.nerdstats,
290            "sample_rate": options.telemetry.sample_rate,
291            "payload_mode": match options.telemetry.payload_mode {
292                YamlWorkflowPayloadMode::FullPayload => "full_payload",
293                YamlWorkflowPayloadMode::RedactedPayload => "redacted_payload",
294            },
295            "retention_days": options.telemetry.retention_days,
296            "multi_tenant": options.telemetry.multi_tenant,
297            "tool_trace_mode": match options.telemetry.tool_trace_mode {
298                YamlToolTraceMode::Full => "full",
299                YamlToolTraceMode::Redacted => "redacted",
300                YamlToolTraceMode::Off => "off",
301            },
302        },
303        "trace": {
304            "tenant": {
305                "workspace_id": options.trace.tenant.workspace_id,
306                "user_id": options.trace.tenant.user_id,
307                "conversation_id": options.trace.tenant.conversation_id,
308                "request_id": options.trace.tenant.request_id,
309                "run_id": options.trace.tenant.run_id,
310            }
311        },
312    })
313}
314
315fn apply_trace_tenant_attributes(span: &mut dyn WorkflowSpan, options: &YamlWorkflowRunOptions) {
316    if let Some(workspace_id) = options.trace.tenant.workspace_id.as_deref() {
317        span.set_attribute("tenant.workspace_id", workspace_id);
318    }
319    if let Some(user_id) = options.trace.tenant.user_id.as_deref() {
320        span.set_attribute("tenant.user_id", user_id);
321    }
322    if let Some(conversation_id) = options.trace.tenant.conversation_id.as_deref() {
323        span.set_attribute("tenant.conversation_id", conversation_id);
324    }
325    if let Some(request_id) = options.trace.tenant.request_id.as_deref() {
326        span.set_attribute("tenant.request_id", request_id);
327    }
328    if let Some(run_id) = options.trace.tenant.run_id.as_deref() {
329        span.set_attribute("tenant.run_id", run_id);
330    }
331}
332
333fn workflow_nerdstats(output: &YamlWorkflowRunOutput) -> Value {
334    let llm_nodes_without_usage: Vec<String> = output
335        .step_timings
336        .iter()
337        .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
338        .map(|step| step.node_id.clone())
339        .collect();
340    let token_metrics_available = llm_nodes_without_usage.is_empty();
341    let token_metrics_source = if token_metrics_available {
342        "provider_usage"
343    } else {
344        "provider_stream_usage_unavailable"
345    };
346    let total_input_tokens = if token_metrics_available {
347        json!(output.total_input_tokens)
348    } else {
349        Value::Null
350    };
351    let total_output_tokens = if token_metrics_available {
352        json!(output.total_output_tokens)
353    } else {
354        Value::Null
355    };
356    let total_tokens = if token_metrics_available {
357        json!(output.total_tokens)
358    } else {
359        Value::Null
360    };
361    let total_thinking_tokens = if token_metrics_available {
362        json!(output.total_thinking_tokens)
363    } else {
364        Value::Null
365    };
366    let tokens_per_second = if token_metrics_available {
367        json!(output.tokens_per_second)
368    } else {
369        Value::Null
370    };
371
372    json!({
373        "workflow_id": output.workflow_id,
374        "terminal_node": output.terminal_node,
375        "total_elapsed_ms": output.total_elapsed_ms,
376        "ttft_ms": output.ttft_ms,
377        "step_timings": output.step_timings,
378        "llm_node_metrics": output.llm_node_metrics,
379        "total_input_tokens": total_input_tokens,
380        "total_output_tokens": total_output_tokens,
381        "total_tokens": total_tokens,
382        "total_thinking_tokens": total_thinking_tokens,
383        "tokens_per_second": tokens_per_second,
384        "trace_id": output.trace_id,
385        "token_metrics_available": token_metrics_available,
386        "token_metrics_source": token_metrics_source,
387        "llm_nodes_without_usage": llm_nodes_without_usage,
388    })
389}
390
391fn payload_for_span(mode: YamlWorkflowPayloadMode, payload: &Value) -> String {
392    match mode {
393        YamlWorkflowPayloadMode::FullPayload => payload.to_string(),
394        YamlWorkflowPayloadMode::RedactedPayload => json!({
395            "redacted": true,
396            "value_type": match payload {
397                Value::Null => "null",
398                Value::Bool(_) => "bool",
399                Value::Number(_) => "number",
400                Value::String(_) => "string",
401                Value::Array(_) => "array",
402                Value::Object(_) => "object",
403            }
404        })
405        .to_string(),
406    }
407}
408
409fn payload_for_tool_trace(mode: YamlToolTraceMode, payload: &Value) -> Value {
410    match mode {
411        YamlToolTraceMode::Full => payload.clone(),
412        YamlToolTraceMode::Redacted => json!({
413            "redacted": true,
414            "value_type": json_type_name(payload),
415        }),
416        YamlToolTraceMode::Off => Value::Null,
417    }
418}
419
420fn validate_json_schema(schema: &Value) -> Result<(), String> {
421    jsonschema::JSONSchema::compile(schema)
422        .map(|_| ())
423        .map_err(|error| format!("invalid JSON schema: {error}"))
424}
425
426fn validate_schema_instance(schema: &Value, instance: &Value) -> Result<(), String> {
427    let validator = jsonschema::JSONSchema::compile(schema)
428        .map_err(|error| format!("invalid JSON schema: {error}"))?;
429    if let Err(errors) = validator.validate(instance) {
430        let message = errors
431            .into_iter()
432            .next()
433            .map(|error| error.to_string())
434            .unwrap_or_else(|| "unknown schema validation error".to_string());
435        return Err(format!("schema validation failed: {message}"));
436    }
437    Ok(())
438}
439
440fn trace_context_from_options(options: &YamlWorkflowRunOptions) -> Option<TraceContext> {
441    options.trace.context.as_ref().map(|input| TraceContext {
442        trace_id: input.trace_id.clone(),
443        span_id: input.span_id.clone(),
444        parent_span_id: input.parent_span_id.clone(),
445        traceparent: input.traceparent.clone(),
446        tracestate: input.tracestate.clone(),
447        baggage: input.baggage.clone(),
448    })
449}
450
451fn merged_trace_context_for_worker(
452    span_context: Option<&TraceContext>,
453    resolved_trace_id: Option<&str>,
454    options: &YamlWorkflowRunOptions,
455) -> TraceContext {
456    let input_context = options.trace.context.as_ref();
457    let baggage = if let Some(context) = span_context {
458        if !context.baggage.is_empty() {
459            context.baggage.clone()
460        } else {
461            input_context
462                .map(|value| value.baggage.clone())
463                .unwrap_or_default()
464        }
465    } else {
466        input_context
467            .map(|value| value.baggage.clone())
468            .unwrap_or_default()
469    };
470
471    TraceContext {
472        trace_id: span_context
473            .and_then(|context| context.trace_id.clone())
474            .or_else(|| resolved_trace_id.map(|value| value.to_string()))
475            .or_else(|| input_context.and_then(|context| context.trace_id.clone())),
476        span_id: span_context
477            .and_then(|context| context.span_id.clone())
478            .or_else(|| input_context.and_then(|context| context.span_id.clone())),
479        parent_span_id: span_context
480            .and_then(|context| context.parent_span_id.clone())
481            .or_else(|| input_context.and_then(|context| context.parent_span_id.clone())),
482        traceparent: span_context
483            .and_then(|context| context.traceparent.clone())
484            .or_else(|| input_context.and_then(|context| context.traceparent.clone())),
485        tracestate: span_context
486            .and_then(|context| context.tracestate.clone())
487            .or_else(|| input_context.and_then(|context| context.tracestate.clone())),
488        baggage,
489    }
490}
491
492fn custom_worker_context_with_trace(
493    context: &Value,
494    trace_context: &TraceContext,
495    tenant_context: &YamlWorkflowTraceTenantContext,
496) -> Value {
497    let mut context_with_trace = context.clone();
498    let Some(root) = context_with_trace.as_object_mut() else {
499        return context_with_trace;
500    };
501
502    root.insert(
503        "trace".to_string(),
504        json!({
505            "context": {
506                "trace_id": trace_context.trace_id,
507                "span_id": trace_context.span_id,
508                "parent_span_id": trace_context.parent_span_id,
509                "traceparent": trace_context.traceparent,
510                "tracestate": trace_context.tracestate,
511                "baggage": trace_context.baggage,
512            },
513            "tenant": {
514                "workspace_id": tenant_context.workspace_id,
515                "user_id": tenant_context.user_id,
516                "conversation_id": tenant_context.conversation_id,
517                "request_id": tenant_context.request_id,
518                "run_id": tenant_context.run_id,
519            }
520        }),
521    );
522
523    context_with_trace
524}
525
526fn include_raw_stream_debug_events() -> bool {
527    match std::env::var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW") {
528        Ok(value) => {
529            let normalized = value.trim().to_ascii_lowercase();
530            normalized == "1" || normalized == "true" || normalized == "yes" || normalized == "on"
531        }
532        Err(_) => false,
533    }
534}
535
536#[derive(Debug)]
537struct StreamedPayloadResolution {
538    payload: Value,
539    heal_confidence: Option<f32>,
540}
541
542#[derive(Debug, Default)]
543struct StreamJsonAsTextFormatter {
544    raw_json: String,
545    emitted: bool,
546}
547
548impl StreamJsonAsTextFormatter {
549    fn push(&mut self, chunk: &str) {
550        self.raw_json.push_str(chunk);
551    }
552
553    fn emit_if_ready(&mut self, complete: bool) -> Option<String> {
554        if self.emitted || !complete {
555            return None;
556        }
557        self.emitted = true;
558        Some(render_json_object_as_text(self.raw_json.as_str()))
559    }
560}
561
562fn render_json_object_as_text(raw_json: &str) -> String {
563    let value = match serde_json::from_str::<Value>(raw_json) {
564        Ok(value) => value,
565        Err(_) => return raw_json.to_string(),
566    };
567    let Some(object) = value.as_object() else {
568        return raw_json.to_string();
569    };
570
571    let mut lines = Vec::with_capacity(object.len());
572    for (key, value) in object {
573        let rendered = match value {
574            Value::String(text) => text.clone(),
575            _ => value.to_string(),
576        };
577        lines.push(format!("{key}: {rendered}"));
578    }
579    lines.join("\n")
580}
581
582#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
583#[serde(rename_all = "snake_case")]
584pub enum YamlWorkflowTokenKind {
585    Output,
586    Thinking,
587}
588
589#[derive(Debug, Default)]
590struct StructuredJsonDeltaFilter {
591    started: bool,
592    completed: bool,
593    depth: u32,
594    in_string: bool,
595    escape: bool,
596}
597
598impl StructuredJsonDeltaFilter {
599    fn split(&mut self, delta: &str) -> (Option<String>, Option<String>) {
600        if delta.is_empty() {
601            return (None, None);
602        }
603
604        let mut output = String::new();
605        let mut thinking = String::new();
606
607        for ch in delta.chars() {
608            if self.completed {
609                thinking.push(ch);
610                continue;
611            }
612
613            if !self.started {
614                if ch != '{' {
615                    thinking.push(ch);
616                    continue;
617                }
618                self.started = true;
619                self.depth = 1;
620                output.push(ch);
621                continue;
622            }
623
624            output.push(ch);
625            if self.in_string {
626                if self.escape {
627                    self.escape = false;
628                    continue;
629                }
630                if ch == '\\' {
631                    self.escape = true;
632                    continue;
633                }
634                if ch == '"' {
635                    self.in_string = false;
636                }
637                continue;
638            }
639
640            match ch {
641                '"' => self.in_string = true,
642                '{' => self.depth = self.depth.saturating_add(1),
643                '}' => {
644                    if self.depth > 0 {
645                        self.depth -= 1;
646                    }
647                    if self.depth == 0 {
648                        self.completed = true;
649                    }
650                }
651                _ => {}
652            }
653        }
654
655        let output = if output.is_empty() {
656            None
657        } else {
658            Some(output)
659        };
660        let thinking = if thinking.is_empty() {
661            None
662        } else {
663            Some(thinking)
664        };
665
666        (output, thinking)
667    }
668
669    fn completed(&self) -> bool {
670        self.completed
671    }
672}
673
674fn extract_last_fenced_json_block(raw: &str) -> Option<&str> {
675    let start = raw.rfind("```json")?;
676    let remainder = &raw[start + "```json".len()..];
677    let end = remainder.find("```")?;
678    let candidate = remainder[..end].trim();
679    if candidate.is_empty() {
680        return None;
681    }
682    Some(candidate)
683}
684
685fn extract_balanced_object_from(raw: &str, start_index: usize) -> Option<&str> {
686    let mut depth = 0u32;
687    let mut in_string = false;
688    let mut escape = false;
689
690    for (relative_index, ch) in raw[start_index..].char_indices() {
691        if in_string {
692            if escape {
693                escape = false;
694                continue;
695            }
696            if ch == '\\' {
697                escape = true;
698                continue;
699            }
700            if ch == '"' {
701                in_string = false;
702            }
703            continue;
704        }
705
706        match ch {
707            '"' => in_string = true,
708            '{' => depth = depth.saturating_add(1),
709            '}' => {
710                if depth == 0 {
711                    return None;
712                }
713                depth -= 1;
714                if depth == 0 {
715                    let end_index = start_index + relative_index + ch.len_utf8();
716                    return Some(raw[start_index..end_index].trim());
717                }
718            }
719            _ => {}
720        }
721    }
722
723    None
724}
725
726fn extract_last_parsable_object(raw: &str) -> Option<&str> {
727    let starts: Vec<usize> = raw
728        .char_indices()
729        .filter_map(|(index, ch)| if ch == '{' { Some(index) } else { None })
730        .collect();
731
732    for start in starts.into_iter().rev() {
733        let Some(candidate) = extract_balanced_object_from(raw, start) else {
734            continue;
735        };
736        if serde_json::from_str::<Value>(candidate).is_ok() {
737            return Some(candidate);
738        }
739    }
740
741    None
742}
743
744fn resolve_structured_json_candidate(raw: &str) -> Option<&str> {
745    extract_last_fenced_json_block(raw).or_else(|| extract_last_parsable_object(raw))
746}
747
748fn parse_streamed_structured_payload(
749    raw: &str,
750    heal: bool,
751) -> Result<StreamedPayloadResolution, String> {
752    if !heal {
753        if let Ok(payload) = serde_json::from_str::<Value>(raw) {
754            return Ok(StreamedPayloadResolution {
755                payload,
756                heal_confidence: None,
757            });
758        }
759
760        let candidate = resolve_structured_json_candidate(raw).ok_or_else(|| {
761            "failed to parse streamed structured completion JSON: no JSON object candidate found"
762                .to_string()
763        })?;
764        let payload = serde_json::from_str::<Value>(candidate).map_err(|error| {
765            format!(
766                "failed to parse streamed structured completion JSON: {error}; candidate={candidate}"
767            )
768        })?;
769        return Ok(StreamedPayloadResolution {
770            payload,
771            heal_confidence: None,
772        });
773    }
774
775    let candidate = resolve_structured_json_candidate(raw).unwrap_or(raw);
776    let parser = JsonishParser::new();
777    let healed = parser
778        .parse(candidate)
779        .map_err(|error| format!("failed to heal streamed structured completion JSON: {error}"))?;
780
781    Ok(StreamedPayloadResolution {
782        payload: healed.value,
783        heal_confidence: Some(healed.confidence),
784    })
785}
786
787#[derive(Debug, Clone, PartialEq, Serialize)]
788pub struct YamlWorkflowEvent {
789    pub event_type: String,
790    pub node_id: Option<String>,
791    pub step_id: Option<String>,
792    pub node_kind: Option<String>,
793    pub streamable: Option<bool>,
794    pub message: Option<String>,
795    pub delta: Option<String>,
796    pub token_kind: Option<YamlWorkflowTokenKind>,
797    pub is_terminal_node_token: Option<bool>,
798    pub elapsed_ms: Option<u128>,
799    pub metadata: Option<Value>,
800}
801
802pub type WorkflowMessageRole = Role;
803
804#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
805pub struct WorkflowMessage {
806    pub role: WorkflowMessageRole,
807    pub content: String,
808    #[serde(default)]
809    pub name: Option<String>,
810    #[serde(default, alias = "toolCallId")]
811    pub tool_call_id: Option<String>,
812}
813
814#[derive(Debug, Clone, PartialEq, Serialize)]
815pub struct YamlTemplateBinding {
816    pub index: usize,
817    pub expression: String,
818    pub source_path: String,
819    pub resolved: Value,
820    pub resolved_type: String,
821    pub missing: bool,
822}
823
824#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
825pub enum YamlWorkflowDiagnosticSeverity {
826    Error,
827    Warning,
828}
829
830#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
831pub struct YamlWorkflowDiagnostic {
832    pub node_id: Option<String>,
833    pub code: String,
834    pub severity: YamlWorkflowDiagnosticSeverity,
835    pub message: String,
836}
837
838#[derive(Debug, Error)]
839pub enum YamlWorkflowRunError {
840    #[error("failed to read workflow yaml '{path}': {source}")]
841    Read {
842        path: String,
843        source: std::io::Error,
844    },
845    #[error("failed to parse workflow yaml '{path}': {source}")]
846    Parse {
847        path: String,
848        source: serde_yaml::Error,
849    },
850    #[error("workflow '{workflow_id}' has no nodes")]
851    EmptyNodes { workflow_id: String },
852    #[error("entry node '{entry_node}' does not exist")]
853    MissingEntry { entry_node: String },
854    #[error("unknown node id '{node_id}'")]
855    MissingNode { node_id: String },
856    #[error("unsupported node type in '{node_id}'")]
857    UnsupportedNodeType { node_id: String },
858    #[error("unsupported switch condition format: {condition}")]
859    UnsupportedCondition { condition: String },
860    #[error("switch node '{node_id}' has no valid next target")]
861    InvalidSwitchTarget { node_id: String },
862    #[error("llm returned non-object payload for node '{node_id}'")]
863    LlmPayloadNotObject { node_id: String },
864    #[error("custom worker handler '{handler}' is not supported")]
865    UnsupportedCustomHandler { handler: String },
866    #[error("llm execution failed for node '{node_id}': {message}")]
867    Llm { node_id: String, message: String },
868    #[error("custom worker execution failed for node '{node_id}': {message}")]
869    CustomWorker { node_id: String, message: String },
870    #[error("workflow validation failed with {diagnostics_count} error(s)")]
871    Validation {
872        diagnostics_count: usize,
873        diagnostics: Vec<YamlWorkflowDiagnostic>,
874    },
875    #[error("invalid workflow input: {message}")]
876    InvalidInput { message: String },
877    #[error("ir runtime execution failed: {message}")]
878    IrRuntime { message: String },
879    #[error("workflow event stream cancelled: {message}")]
880    EventSinkCancelled { message: String },
881}
882
883pub trait YamlWorkflowEventSink: Send + Sync {
884    fn emit(&self, event: &YamlWorkflowEvent);
885
886    fn is_cancelled(&self) -> bool {
887        false
888    }
889}
890
891pub struct NoopYamlWorkflowEventSink;
892
893impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
894    fn emit(&self, _event: &YamlWorkflowEvent) {}
895}
896
897fn workflow_event_sink_cancelled_message() -> &'static str {
898    "workflow event callback cancelled"
899}
900
901fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
902    event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
903}
904
905#[derive(Debug, Clone, PartialEq, Eq, Error)]
906pub enum YamlToIrError {
907    #[error("entry node '{entry_node}' does not exist")]
908    MissingEntry { entry_node: String },
909    #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
910    MultipleOutgoingEdge { node_id: String },
911    #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
912    UnsupportedNode { node_id: String, reason: String },
913}
914
915/// Render a YAML workflow graph as Mermaid flowchart.
916pub fn yaml_workflow_to_mermaid(workflow: &YamlWorkflow) -> String {
917    if let Ok(ir) = yaml_workflow_to_ir(workflow) {
918        return workflow_to_mermaid(&ir);
919    }
920
921    yaml_workflow_to_mermaid_fallback(workflow)
922}
923
924fn yaml_workflow_to_mermaid_fallback(workflow: &YamlWorkflow) -> String {
925    let mut lines = Vec::new();
926    lines.push("flowchart TD".to_string());
927
928    for node in &workflow.nodes {
929        lines.push(format!(
930            "  {}[\"{}\\n({})\"]",
931            sanitize_mermaid_id(&node.id),
932            escape_mermaid_label(&node.id),
933            node.kind_name()
934        ));
935    }
936
937    let mut emitted: HashSet<(String, String, String)> = HashSet::new();
938
939    for edge in &workflow.edges {
940        emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
941    }
942
943    for node in &workflow.nodes {
944        if let Some(switch) = node.node_type.switch.as_ref() {
945            for branch in &switch.branches {
946                emitted.insert((
947                    node.id.clone(),
948                    branch.condition.clone(),
949                    branch.target.clone(),
950                ));
951            }
952            emitted.insert((
953                node.id.clone(),
954                "default".to_string(),
955                switch.default.clone(),
956            ));
957        }
958    }
959
960    let mut edges = emitted.into_iter().collect::<Vec<_>>();
961    edges.sort();
962
963    for (from, label, to) in edges {
964        if label.is_empty() {
965            lines.push(format!(
966                "  {} --> {}",
967                sanitize_mermaid_id(&from),
968                sanitize_mermaid_id(&to)
969            ));
970        } else {
971            lines.push(format!(
972                "  {} -- \"{}\" --> {}",
973                sanitize_mermaid_id(&from),
974                escape_mermaid_label(&label),
975                sanitize_mermaid_id(&to)
976            ));
977        }
978    }
979
980    lines.join("\n")
981}
982
983/// Load a YAML workflow file and render it as Mermaid flowchart.
984pub fn yaml_workflow_file_to_mermaid(workflow_path: &Path) -> Result<String, YamlWorkflowRunError> {
985    let contents =
986        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
987            path: workflow_path.display().to_string(),
988            source,
989        })?;
990
991    let workflow: YamlWorkflow =
992        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
993            path: workflow_path.display().to_string(),
994            source,
995        })?;
996
997    Ok(yaml_workflow_to_mermaid(&workflow))
998}
999
1000pub fn yaml_workflow_to_ir(workflow: &YamlWorkflow) -> Result<WorkflowDefinition, YamlToIrError> {
1001    let known_ids: HashSet<&str> = workflow.nodes.iter().map(|n| n.id.as_str()).collect();
1002    if !known_ids.contains(workflow.entry_node.as_str()) {
1003        return Err(YamlToIrError::MissingEntry {
1004            entry_node: workflow.entry_node.clone(),
1005        });
1006    }
1007
1008    let mut outgoing: HashMap<&str, Vec<&str>> = HashMap::new();
1009    for edge in &workflow.edges {
1010        outgoing
1011            .entry(edge.from.as_str())
1012            .or_default()
1013            .push(edge.to.as_str());
1014    }
1015
1016    let mut nodes = Vec::with_capacity(workflow.nodes.len() + 1);
1017    nodes.push(Node {
1018        id: YAML_START_NODE_ID.to_string(),
1019        kind: NodeKind::Start {
1020            next: workflow.entry_node.clone(),
1021        },
1022    });
1023
1024    for node in &workflow.nodes {
1025        if let Some(llm) = node.node_type.llm_call.as_ref() {
1026            if node
1027                .config
1028                .as_ref()
1029                .and_then(|c| c.set_globals.as_ref())
1030                .is_some()
1031                || node
1032                    .config
1033                    .as_ref()
1034                    .and_then(|c| c.update_globals.as_ref())
1035                    .is_some()
1036            {
1037                return Err(YamlToIrError::UnsupportedNode {
1038                    node_id: node.id.clone(),
1039                    reason: "set_globals/update_globals are not represented in canonical IR llm nodes yet"
1040                        .to_string(),
1041                });
1042            }
1043
1044            if !llm.tools.is_empty() {
1045                return Err(YamlToIrError::UnsupportedNode {
1046                    node_id: node.id.clone(),
1047                    reason: "llm_call.tools are not represented in canonical IR llm nodes yet"
1048                        .to_string(),
1049                });
1050            }
1051
1052            let next = single_next_for_node(&outgoing, &node.id)?;
1053            nodes.push(Node {
1054                id: node.id.clone(),
1055                kind: NodeKind::Tool {
1056                    tool: YAML_LLM_TOOL_ID.to_string(),
1057                    input: json!({
1058                        "node_id": node.id,
1059                        "model": llm.model,
1060                        "prompt_template": node
1061                            .config
1062                            .as_ref()
1063                            .and_then(|c| c.prompt.clone())
1064                            .unwrap_or_default(),
1065                        "stream": llm.stream.unwrap_or(false),
1066                        "stream_json_as_text": llm.stream_json_as_text.unwrap_or(false),
1067                        "heal": llm.heal.unwrap_or(false),
1068                        "messages_path": llm.messages_path,
1069                        "append_prompt_as_user": llm.append_prompt_as_user.unwrap_or(true),
1070                        "output_schema": node
1071                            .config
1072                            .as_ref()
1073                            .and_then(|c| c.output_schema.clone())
1074                            .unwrap_or_else(default_llm_output_schema),
1075                    }),
1076                    next,
1077                },
1078            });
1079            continue;
1080        }
1081
1082        if let Some(worker) = node.node_type.custom_worker.as_ref() {
1083            if node
1084                .config
1085                .as_ref()
1086                .and_then(|c| c.set_globals.as_ref())
1087                .is_some()
1088                || node
1089                    .config
1090                    .as_ref()
1091                    .and_then(|c| c.update_globals.as_ref())
1092                    .is_some()
1093            {
1094                return Err(YamlToIrError::UnsupportedNode {
1095                    node_id: node.id.clone(),
1096                    reason: "set_globals/update_globals are not represented in canonical IR tool nodes yet"
1097                        .to_string(),
1098                });
1099            }
1100
1101            let next = single_next_for_node(&outgoing, &node.id)?;
1102            nodes.push(Node {
1103                id: node.id.clone(),
1104                kind: NodeKind::Tool {
1105                    tool: worker.handler.clone(),
1106                    input: node
1107                        .config
1108                        .as_ref()
1109                        .and_then(|c| c.payload.clone())
1110                        .unwrap_or_else(|| json!({})),
1111                    next,
1112                },
1113            });
1114            continue;
1115        }
1116
1117        if let Some(switch) = node.node_type.switch.as_ref() {
1118            nodes.push(Node {
1119                id: node.id.clone(),
1120                kind: NodeKind::Router {
1121                    routes: switch
1122                        .branches
1123                        .iter()
1124                        .map(|b| RouterRoute {
1125                            when: rewrite_yaml_condition_to_ir(&b.condition),
1126                            next: b.target.clone(),
1127                        })
1128                        .collect(),
1129                    default: switch.default.clone(),
1130                },
1131            });
1132            continue;
1133        }
1134
1135        return Err(YamlToIrError::UnsupportedNode {
1136            node_id: node.id.clone(),
1137            reason: "node_type must be llm_call, switch, or custom_worker".to_string(),
1138        });
1139    }
1140
1141    Ok(WorkflowDefinition {
1142        version: WORKFLOW_IR_V0.to_string(),
1143        name: workflow.id.clone(),
1144        nodes,
1145    })
1146}
1147
1148fn single_next_for_node(
1149    outgoing: &HashMap<&str, Vec<&str>>,
1150    node_id: &str,
1151) -> Result<Option<String>, YamlToIrError> {
1152    match outgoing.get(node_id) {
1153        None => Ok(None),
1154        Some(targets) if targets.len() == 1 => Ok(Some(targets[0].to_string())),
1155        Some(_) => Err(YamlToIrError::MultipleOutgoingEdge {
1156            node_id: node_id.to_string(),
1157        }),
1158    }
1159}
1160
1161fn rewrite_yaml_condition_to_ir(expr: &str) -> String {
1162    let rewritten = expr
1163        .replace("$.nodes.", "$.node_outputs.")
1164        .replace(".output.", ".");
1165    if let Some(prefix) = rewritten.strip_suffix(".output") {
1166        prefix.to_string()
1167    } else {
1168        rewritten
1169    }
1170}
1171
1172fn sanitize_mermaid_id(id: &str) -> String {
1173    let mut out = String::with_capacity(id.len() + 1);
1174    if id
1175        .chars()
1176        .next()
1177        .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_')
1178    {
1179        out.push_str(id);
1180    } else {
1181        out.push('n');
1182        out.push('_');
1183        out.push_str(id);
1184    }
1185    out.chars()
1186        .map(|ch| {
1187            if ch.is_ascii_alphanumeric() || ch == '_' {
1188                ch
1189            } else {
1190                '_'
1191            }
1192        })
1193        .collect()
1194}
1195
1196fn escape_mermaid_label(label: &str) -> String {
1197    label.replace('"', "\\\"")
1198}
1199
1200#[derive(Debug, Clone)]
1201pub struct YamlLlmExecutionRequest {
1202    pub node_id: String,
1203    pub is_terminal_node: bool,
1204    pub stream_json_as_text: bool,
1205    pub model: String,
1206    pub messages: Option<Vec<Message>>,
1207    pub append_prompt_as_user: bool,
1208    pub prompt: String,
1209    pub prompt_template: String,
1210    pub prompt_bindings: Vec<YamlTemplateBinding>,
1211    pub schema: Value,
1212    pub stream: bool,
1213    pub heal: bool,
1214    pub tools: Vec<YamlResolvedTool>,
1215    pub tool_choice: Option<ToolChoice>,
1216    pub max_tool_roundtrips: u8,
1217    pub tool_calls_global_key: Option<String>,
1218    pub tool_trace_mode: YamlToolTraceMode,
1219    pub execution_context: Value,
1220    pub email_text: String,
1221}
1222
1223#[derive(Debug, Clone)]
1224pub struct YamlResolvedTool {
1225    pub definition: ToolDefinition,
1226    pub output_schema: Option<Value>,
1227}
1228
1229#[async_trait]
1230pub trait YamlWorkflowLlmExecutor: Send + Sync {
1231    async fn complete_structured(
1232        &self,
1233        request: YamlLlmExecutionRequest,
1234        event_sink: Option<&dyn YamlWorkflowEventSink>,
1235    ) -> Result<YamlLlmExecutionResult, String>;
1236}
1237
1238#[async_trait]
1239pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
1240    async fn execute(
1241        &self,
1242        handler: &str,
1243        payload: &Value,
1244        email_text: &str,
1245        context: &Value,
1246    ) -> Result<Value, String>;
1247}
1248
1249pub async fn run_workflow_yaml_file(
1250    workflow_path: &Path,
1251    workflow_input: &Value,
1252    executor: &dyn YamlWorkflowLlmExecutor,
1253) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1254    let contents =
1255        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1256            path: workflow_path.display().to_string(),
1257            source,
1258        })?;
1259
1260    let workflow: YamlWorkflow =
1261        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1262            path: workflow_path.display().to_string(),
1263            source,
1264        })?;
1265
1266    run_workflow_yaml(&workflow, workflow_input, executor).await
1267}
1268
1269pub async fn run_email_workflow_yaml_file(
1270    workflow_path: &Path,
1271    email_text: &str,
1272    executor: &dyn YamlWorkflowLlmExecutor,
1273) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1274    let workflow_input = json!({ "email_text": email_text });
1275    run_workflow_yaml_file(workflow_path, &workflow_input, executor).await
1276}
1277
1278pub async fn run_workflow_yaml_file_with_client(
1279    workflow_path: &Path,
1280    workflow_input: &Value,
1281    client: &SimpleAgentsClient,
1282) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1283    let contents =
1284        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1285            path: workflow_path.display().to_string(),
1286            source,
1287        })?;
1288
1289    let workflow: YamlWorkflow =
1290        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1291            path: workflow_path.display().to_string(),
1292            source,
1293        })?;
1294
1295    run_workflow_yaml_with_client(&workflow, workflow_input, client).await
1296}
1297
1298pub async fn run_email_workflow_yaml_file_with_client(
1299    workflow_path: &Path,
1300    email_text: &str,
1301    client: &SimpleAgentsClient,
1302) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1303    let workflow_input = json!({ "email_text": email_text });
1304    run_workflow_yaml_file_with_client(workflow_path, &workflow_input, client).await
1305}
1306
1307pub async fn run_workflow_yaml_with_client(
1308    workflow: &YamlWorkflow,
1309    workflow_input: &Value,
1310    client: &SimpleAgentsClient,
1311) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1312    run_workflow_yaml_with_client_and_custom_worker(workflow, workflow_input, client, None).await
1313}
1314
1315pub async fn run_email_workflow_yaml_with_client(
1316    workflow: &YamlWorkflow,
1317    email_text: &str,
1318    client: &SimpleAgentsClient,
1319) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1320    let workflow_input = json!({ "email_text": email_text });
1321    run_workflow_yaml_with_client(workflow, &workflow_input, client).await
1322}
1323
1324pub async fn run_workflow_yaml_file_with_client_and_custom_worker(
1325    workflow_path: &Path,
1326    workflow_input: &Value,
1327    client: &SimpleAgentsClient,
1328    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1329) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1330    let contents =
1331        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1332            path: workflow_path.display().to_string(),
1333            source,
1334        })?;
1335
1336    let workflow: YamlWorkflow =
1337        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1338            path: workflow_path.display().to_string(),
1339            source,
1340        })?;
1341
1342    run_workflow_yaml_with_client_and_custom_worker(
1343        &workflow,
1344        workflow_input,
1345        client,
1346        custom_worker,
1347    )
1348    .await
1349}
1350
1351pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker(
1352    workflow_path: &Path,
1353    email_text: &str,
1354    client: &SimpleAgentsClient,
1355    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1356) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1357    let workflow_input = json!({ "email_text": email_text });
1358    run_workflow_yaml_file_with_client_and_custom_worker(
1359        workflow_path,
1360        &workflow_input,
1361        client,
1362        custom_worker,
1363    )
1364    .await
1365}
1366
1367pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events(
1368    workflow_path: &Path,
1369    workflow_input: &Value,
1370    client: &SimpleAgentsClient,
1371    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1372    event_sink: Option<&dyn YamlWorkflowEventSink>,
1373) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1374    run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
1375        workflow_path,
1376        workflow_input,
1377        client,
1378        custom_worker,
1379        event_sink,
1380        &YamlWorkflowRunOptions::default(),
1381    )
1382    .await
1383}
1384
1385pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
1386    workflow_path: &Path,
1387    workflow_input: &Value,
1388    client: &SimpleAgentsClient,
1389    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1390    event_sink: Option<&dyn YamlWorkflowEventSink>,
1391    options: &YamlWorkflowRunOptions,
1392) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1393    let contents =
1394        std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1395            path: workflow_path.display().to_string(),
1396            source,
1397        })?;
1398
1399    let workflow: YamlWorkflow =
1400        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1401            path: workflow_path.display().to_string(),
1402            source,
1403        })?;
1404
1405    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1406        &workflow,
1407        workflow_input,
1408        client,
1409        custom_worker,
1410        event_sink,
1411        options,
1412    )
1413    .await
1414}
1415
1416pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker_and_events(
1417    workflow_path: &Path,
1418    email_text: &str,
1419    client: &SimpleAgentsClient,
1420    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1421    event_sink: Option<&dyn YamlWorkflowEventSink>,
1422) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1423    let workflow_input = json!({ "email_text": email_text });
1424    run_workflow_yaml_file_with_client_and_custom_worker_and_events(
1425        workflow_path,
1426        &workflow_input,
1427        client,
1428        custom_worker,
1429        event_sink,
1430    )
1431    .await
1432}
1433
1434pub async fn run_workflow_yaml_with_client_and_custom_worker(
1435    workflow: &YamlWorkflow,
1436    workflow_input: &Value,
1437    client: &SimpleAgentsClient,
1438    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1439) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1440    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1441        workflow,
1442        workflow_input,
1443        client,
1444        custom_worker,
1445        None,
1446        &YamlWorkflowRunOptions::default(),
1447    )
1448    .await
1449}
1450
1451pub async fn run_email_workflow_yaml_with_client_and_custom_worker(
1452    workflow: &YamlWorkflow,
1453    email_text: &str,
1454    client: &SimpleAgentsClient,
1455    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1456) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1457    let workflow_input = json!({ "email_text": email_text });
1458    run_workflow_yaml_with_client_and_custom_worker(
1459        workflow,
1460        &workflow_input,
1461        client,
1462        custom_worker,
1463    )
1464    .await
1465}
1466
1467pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events(
1468    workflow: &YamlWorkflow,
1469    workflow_input: &Value,
1470    client: &SimpleAgentsClient,
1471    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1472    event_sink: Option<&dyn YamlWorkflowEventSink>,
1473) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1474    run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1475        workflow,
1476        workflow_input,
1477        client,
1478        custom_worker,
1479        event_sink,
1480        &YamlWorkflowRunOptions::default(),
1481    )
1482    .await
1483}
1484
1485pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1486    workflow: &YamlWorkflow,
1487    workflow_input: &Value,
1488    client: &SimpleAgentsClient,
1489    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1490    event_sink: Option<&dyn YamlWorkflowEventSink>,
1491    options: &YamlWorkflowRunOptions,
1492) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1493    struct BorrowedClientExecutor<'a> {
1494        client: &'a SimpleAgentsClient,
1495        custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
1496    }
1497
1498    #[async_trait]
1499    impl<'a> YamlWorkflowLlmExecutor for BorrowedClientExecutor<'a> {
1500        async fn complete_structured(
1501            &self,
1502            request: YamlLlmExecutionRequest,
1503            event_sink: Option<&dyn YamlWorkflowEventSink>,
1504        ) -> Result<YamlLlmExecutionResult, String> {
1505            let messages = if let Some(mut history) = request.messages.clone() {
1506                if request.append_prompt_as_user && !request.prompt.trim().is_empty() {
1507                    history.push(Message::user(&request.prompt));
1508                }
1509                history
1510            } else {
1511                vec![
1512                    Message::system("You execute workflow classification steps."),
1513                    Message::user(&request.prompt),
1514                ]
1515            };
1516
1517            if !request.tools.is_empty() {
1518                if request.stream {
1519                    return Err(
1520                        "llm_call.stream=true is not supported when llm_call.tools are configured"
1521                            .to_string(),
1522                    );
1523                }
1524
1525                let mut tool_traces: Vec<YamlToolCallTrace> = Vec::new();
1526                let mut conversation = messages;
1527                let mut usage_total: Option<YamlLlmTokenUsage> = None;
1528
1529                for roundtrip in 0..=request.max_tool_roundtrips {
1530                    let mut builder = CompletionRequest::builder()
1531                        .model(&request.model)
1532                        .messages(conversation.clone())
1533                        .tools(request.tools.iter().map(|t| t.definition.clone()).collect());
1534
1535                    if let Some(choice) = request.tool_choice.clone() {
1536                        builder = builder.tool_choice(choice);
1537                    }
1538
1539                    let completion_request = builder
1540                        .build()
1541                        .map_err(|error| format!("failed to build completion request: {error}"))?;
1542
1543                    let outcome = self
1544                        .client
1545                        .complete(&completion_request, CompletionOptions::default())
1546                        .await
1547                        .map_err(|error| error.to_string())?;
1548
1549                    let response = match outcome {
1550                        CompletionOutcome::Response(response) => response,
1551                        CompletionOutcome::HealedJson(healed) => healed.response,
1552                        CompletionOutcome::CoercedSchema(coerced) => coerced.response,
1553                        CompletionOutcome::Stream(_) => {
1554                            return Err(
1555                                "streaming outcome is unsupported for tool-enabled llm_call"
1556                                    .to_string(),
1557                            )
1558                        }
1559                    };
1560
1561                    if let Some(usage) = usage_total.as_mut() {
1562                        usage.prompt_tokens += response.usage.prompt_tokens;
1563                        usage.completion_tokens += response.usage.completion_tokens;
1564                        usage.total_tokens += response.usage.total_tokens;
1565                    } else {
1566                        usage_total = Some(YamlLlmTokenUsage {
1567                            prompt_tokens: response.usage.prompt_tokens,
1568                            completion_tokens: response.usage.completion_tokens,
1569                            total_tokens: response.usage.total_tokens,
1570                            thinking_tokens: None,
1571                        });
1572                    }
1573
1574                    let choice = response
1575                        .choices
1576                        .first()
1577                        .ok_or_else(|| "completion returned no choices".to_string())?;
1578
1579                    if choice.finish_reason != FinishReason::ToolCalls {
1580                        let content = response.content().ok_or_else(|| {
1581                            "completion returned empty content for structured payload".to_string()
1582                        })?;
1583                        let payload: Value = serde_json::from_str(content).map_err(|error| {
1584                            format!("failed to parse structured completion JSON: {error}")
1585                        })?;
1586                        return Ok(YamlLlmExecutionResult {
1587                            payload,
1588                            usage: usage_total,
1589                            ttft_ms: None,
1590                            tool_calls: tool_traces,
1591                        });
1592                    }
1593
1594                    if roundtrip >= request.max_tool_roundtrips {
1595                        return Err(format!(
1596                            "tool call roundtrip limit reached for node '{}' (max={})",
1597                            request.node_id, request.max_tool_roundtrips
1598                        ));
1599                    }
1600
1601                    let tool_calls: Vec<ToolCall> =
1602                        choice.message.tool_calls.clone().ok_or_else(|| {
1603                            "finish_reason=tool_calls but no tool calls found".to_string()
1604                        })?;
1605
1606                    conversation.push(choice.message.clone());
1607
1608                    for tool_call in tool_calls {
1609                        let tool_call_id = tool_call.id.clone();
1610                        let tool_name = tool_call.function.name.clone();
1611                        let tool_started = Instant::now();
1612                        let arguments: Value = serde_json::from_str(&tool_call.function.arguments)
1613                            .map_err(|error| {
1614                                format!(
1615                                    "tool '{}' arguments must be valid JSON: {}",
1616                                    tool_name, error
1617                                )
1618                            })?;
1619
1620                        if request.tool_trace_mode != YamlToolTraceMode::Off {
1621                            if let Some(sink) = event_sink {
1622                                sink.emit(&YamlWorkflowEvent {
1623                                    event_type: "node_tool_call_requested".to_string(),
1624                                    node_id: Some(request.node_id.clone()),
1625                                    step_id: Some(request.node_id.clone()),
1626                                    node_kind: Some("llm_call".to_string()),
1627                                    streamable: Some(false),
1628                                    message: Some(format!(
1629                                        "tool call requested: {}",
1630                                        tool_name
1631                                    )),
1632                                    delta: None,
1633                                    token_kind: None,
1634                                    is_terminal_node_token: None,
1635                                    elapsed_ms: None,
1636                                    metadata: Some(json!({
1637                                        "tool_call_id": tool_call_id.clone(),
1638                                        "tool_name": tool_name.clone(),
1639                                        "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
1640                                    })),
1641                                });
1642                            }
1643                        }
1644
1645                        let tool_output_result = if let Some(custom_worker) = self.custom_worker {
1646                            custom_worker
1647                                .execute(
1648                                    tool_name.as_str(),
1649                                    &arguments,
1650                                    request.email_text.as_str(),
1651                                    &request.execution_context,
1652                                )
1653                                .await
1654                        } else {
1655                            mock_custom_worker_output(tool_name.as_str(), &arguments)
1656                                .map_err(|error| error.to_string())
1657                        };
1658
1659                        let Some(tool_config) = request
1660                            .tools
1661                            .iter()
1662                            .find(|tool| tool.definition.function.name == tool_name)
1663                        else {
1664                            return Err(format!("model requested unknown tool '{}'", tool_name));
1665                        };
1666
1667                        let tool_output = match tool_output_result {
1668                            Ok(output) => output,
1669                            Err(message) => {
1670                                let elapsed_ms = tool_started.elapsed().as_millis();
1671                                if request.tool_trace_mode != YamlToolTraceMode::Off {
1672                                    if let Some(sink) = event_sink {
1673                                        sink.emit(&YamlWorkflowEvent {
1674                                            event_type: "node_tool_call_failed".to_string(),
1675                                            node_id: Some(request.node_id.clone()),
1676                                            step_id: Some(request.node_id.clone()),
1677                                            node_kind: Some("llm_call".to_string()),
1678                                            streamable: Some(false),
1679                                            message: Some(message.clone()),
1680                                            delta: None,
1681                                            token_kind: None,
1682                                            is_terminal_node_token: None,
1683                                            elapsed_ms: Some(elapsed_ms),
1684                                            metadata: Some(json!({
1685                                                "tool_call_id": tool_call_id.clone(),
1686                                                "tool_name": tool_name.clone(),
1687                                            })),
1688                                        });
1689                                    }
1690                                }
1691                                tool_traces.push(YamlToolCallTrace {
1692                                    id: tool_call_id.clone(),
1693                                    name: tool_name.clone(),
1694                                    arguments,
1695                                    output: None,
1696                                    status: "error".to_string(),
1697                                    elapsed_ms,
1698                                    error: Some(message.clone()),
1699                                });
1700                                return Err(format!("tool '{}' failed: {}", tool_name, message));
1701                            }
1702                        };
1703
1704                        if let Some(output_schema) = tool_config.output_schema.as_ref() {
1705                            validate_schema_instance(output_schema, &tool_output).map_err(
1706                                |message| {
1707                                    format!(
1708                                        "tool '{}' output failed schema validation: {}",
1709                                        tool_name, message
1710                                    )
1711                                },
1712                            )?;
1713                        }
1714
1715                        let elapsed_ms = tool_started.elapsed().as_millis();
1716                        if request.tool_trace_mode != YamlToolTraceMode::Off {
1717                            if let Some(sink) = event_sink {
1718                                sink.emit(&YamlWorkflowEvent {
1719                                    event_type: "node_tool_call_completed".to_string(),
1720                                    node_id: Some(request.node_id.clone()),
1721                                    step_id: Some(request.node_id.clone()),
1722                                    node_kind: Some("llm_call".to_string()),
1723                                    streamable: Some(false),
1724                                    message: Some(format!(
1725                                        "tool call completed: {}",
1726                                        tool_name
1727                                    )),
1728                                    delta: None,
1729                                    token_kind: None,
1730                                    is_terminal_node_token: None,
1731                                    elapsed_ms: Some(elapsed_ms),
1732                                    metadata: Some(json!({
1733                                        "tool_call_id": tool_call_id.clone(),
1734                                        "tool_name": tool_name.clone(),
1735                                        "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
1736                                        "output": payload_for_tool_trace(request.tool_trace_mode, &tool_output),
1737                                    })),
1738                                });
1739                            }
1740                        }
1741
1742                        tool_traces.push(YamlToolCallTrace {
1743                            id: tool_call_id.clone(),
1744                            name: tool_name.clone(),
1745                            arguments: arguments.clone(),
1746                            output: Some(tool_output.clone()),
1747                            status: "ok".to_string(),
1748                            elapsed_ms,
1749                            error: None,
1750                        });
1751
1752                        conversation.push(Message::tool(
1753                            serde_json::to_string(&tool_output).map_err(|error| {
1754                                format!("failed to serialize tool output: {error}")
1755                            })?,
1756                            tool_call_id,
1757                        ));
1758                    }
1759
1760                    if request.tool_trace_mode != YamlToolTraceMode::Off {
1761                        if let Some(sink) = event_sink {
1762                            sink.emit(&YamlWorkflowEvent {
1763                                event_type: "node_tool_roundtrip_completed".to_string(),
1764                                node_id: Some(request.node_id.clone()),
1765                                step_id: Some(request.node_id.clone()),
1766                                node_kind: Some("llm_call".to_string()),
1767                                streamable: Some(false),
1768                                message: Some(format!(
1769                                    "tool roundtrip {} completed",
1770                                    roundtrip + 1
1771                                )),
1772                                delta: None,
1773                                token_kind: None,
1774                                is_terminal_node_token: None,
1775                                elapsed_ms: None,
1776                                metadata: Some(json!({
1777                                    "roundtrip": roundtrip + 1,
1778                                    "max_tool_roundtrips": request.max_tool_roundtrips,
1779                                })),
1780                            });
1781                        }
1782                    }
1783                }
1784
1785                return Err(format!(
1786                    "tool-enabled llm_call '{}' exhausted loop without final payload",
1787                    request.node_id
1788                ));
1789            }
1790
1791            let mut builder = CompletionRequest::builder()
1792                .model(&request.model)
1793                .messages(messages);
1794
1795            if request.stream {
1796                builder = builder.stream(true);
1797            }
1798
1799            let completion_request = builder
1800                .build()
1801                .map_err(|error| format!("failed to build completion request: {error}"))?;
1802
1803            let completion_options = if request.heal && !request.stream {
1804                CompletionOptions {
1805                    mode: CompletionMode::HealedJson,
1806                }
1807            } else {
1808                CompletionOptions::default()
1809            };
1810
1811            let outcome = self
1812                .client
1813                .complete(&completion_request, completion_options)
1814                .await
1815                .map_err(|error| error.to_string())?;
1816
1817            match outcome {
1818                CompletionOutcome::Stream(mut stream) => {
1819                    let mut aggregated = String::new();
1820                    let mut final_stream_usage: Option<simple_agent_type::response::Usage> = None;
1821                    let stream_started = Instant::now();
1822                    let mut ttft_ms: Option<u128> = None;
1823                    let mut delta_filter = StructuredJsonDeltaFilter::default();
1824                    let include_raw_debug = include_raw_stream_debug_events();
1825                    let mut json_text_formatter = if request.stream_json_as_text {
1826                        Some(StreamJsonAsTextFormatter::default())
1827                    } else {
1828                        None
1829                    };
1830                    while let Some(chunk_result) = stream.next().await {
1831                        if event_sink_is_cancelled(event_sink) {
1832                            return Err(workflow_event_sink_cancelled_message().to_string());
1833                        }
1834                        let chunk = chunk_result.map_err(|error| error.to_string())?;
1835                        if let Some(usage) = chunk.usage {
1836                            final_stream_usage = Some(usage);
1837                        }
1838                        if let Some(choice) = chunk.choices.first() {
1839                            if ttft_ms.is_none()
1840                                && (choice
1841                                    .delta
1842                                    .content
1843                                    .as_ref()
1844                                    .is_some_and(|delta| !delta.is_empty())
1845                                    || choice
1846                                        .delta
1847                                        .reasoning_content
1848                                        .as_ref()
1849                                        .is_some_and(|delta| !delta.is_empty()))
1850                            {
1851                                ttft_ms = Some(stream_started.elapsed().as_millis());
1852                            }
1853                            if include_raw_debug {
1854                                if let Some(reasoning_delta) =
1855                                    choice.delta.reasoning_content.as_ref()
1856                                {
1857                                    if let Some(sink) = event_sink {
1858                                        sink.emit(&YamlWorkflowEvent {
1859                                            event_type: "node_stream_thinking_delta".to_string(),
1860                                            node_id: Some(request.node_id.clone()),
1861                                            step_id: Some(request.node_id.clone()),
1862                                            node_kind: Some("llm_call".to_string()),
1863                                            streamable: Some(true),
1864                                            message: None,
1865                                            delta: Some(reasoning_delta.clone()),
1866                                            token_kind: Some(YamlWorkflowTokenKind::Thinking),
1867                                            is_terminal_node_token: Some(request.is_terminal_node),
1868                                            elapsed_ms: None,
1869                                            metadata: None,
1870                                        });
1871                                    }
1872                                }
1873                            }
1874                            if let Some(delta) = choice.delta.content.clone() {
1875                                aggregated.push_str(delta.as_str());
1876                                let (output_delta, thinking_delta) =
1877                                    delta_filter.split(delta.as_str());
1878                                let rendered_output_delta = if let Some(output_chunk) = output_delta
1879                                {
1880                                    if let Some(formatter) = json_text_formatter.as_mut() {
1881                                        formatter.push(output_chunk.as_str());
1882                                        formatter.emit_if_ready(delta_filter.completed())
1883                                    } else {
1884                                        Some(output_chunk)
1885                                    }
1886                                } else {
1887                                    None
1888                                };
1889                                if include_raw_debug {
1890                                    if let Some(sink) = event_sink {
1891                                        if let Some(raw_thinking_delta) = thinking_delta.as_ref() {
1892                                            sink.emit(&YamlWorkflowEvent {
1893                                                event_type: "node_stream_thinking_delta"
1894                                                    .to_string(),
1895                                                node_id: Some(request.node_id.clone()),
1896                                                step_id: Some(request.node_id.clone()),
1897                                                node_kind: Some("llm_call".to_string()),
1898                                                streamable: Some(true),
1899                                                message: None,
1900                                                delta: Some(raw_thinking_delta.clone()),
1901                                                token_kind: Some(YamlWorkflowTokenKind::Thinking),
1902                                                is_terminal_node_token: Some(
1903                                                    request.is_terminal_node,
1904                                                ),
1905                                                elapsed_ms: None,
1906                                                metadata: None,
1907                                            });
1908                                        }
1909                                        if let Some(raw_output_delta) =
1910                                            rendered_output_delta.as_ref()
1911                                        {
1912                                            sink.emit(&YamlWorkflowEvent {
1913                                                event_type: "node_stream_output_delta".to_string(),
1914                                                node_id: Some(request.node_id.clone()),
1915                                                step_id: Some(request.node_id.clone()),
1916                                                node_kind: Some("llm_call".to_string()),
1917                                                streamable: Some(true),
1918                                                message: None,
1919                                                delta: Some(raw_output_delta.clone()),
1920                                                token_kind: Some(YamlWorkflowTokenKind::Output),
1921                                                is_terminal_node_token: Some(
1922                                                    request.is_terminal_node,
1923                                                ),
1924                                                elapsed_ms: None,
1925                                                metadata: None,
1926                                            });
1927                                        }
1928                                    }
1929                                }
1930                                if let Some(filtered_delta) = rendered_output_delta {
1931                                    if let Some(sink) = event_sink {
1932                                        sink.emit(&YamlWorkflowEvent {
1933                                            event_type: "node_stream_delta".to_string(),
1934                                            node_id: Some(request.node_id.clone()),
1935                                            step_id: Some(request.node_id.clone()),
1936                                            node_kind: Some("llm_call".to_string()),
1937                                            streamable: Some(true),
1938                                            message: None,
1939                                            delta: Some(filtered_delta),
1940                                            token_kind: Some(YamlWorkflowTokenKind::Output),
1941                                            is_terminal_node_token: Some(request.is_terminal_node),
1942                                            elapsed_ms: None,
1943                                            metadata: None,
1944                                        });
1945                                    }
1946                                }
1947                            }
1948                        }
1949
1950                        if event_sink_is_cancelled(event_sink) {
1951                            return Err(workflow_event_sink_cancelled_message().to_string());
1952                        }
1953                    }
1954
1955                    let resolved =
1956                        parse_streamed_structured_payload(aggregated.as_str(), request.heal)?;
1957                    if let Some(confidence) = resolved.heal_confidence {
1958                        if let Some(sink) = event_sink {
1959                            sink.emit(&YamlWorkflowEvent {
1960                                event_type: "node_healed".to_string(),
1961                                node_id: Some(request.node_id.clone()),
1962                                step_id: Some(request.node_id.clone()),
1963                                node_kind: Some("llm_call".to_string()),
1964                                streamable: Some(true),
1965                                message: Some(format!(
1966                                    "healed streamed structured response confidence={confidence}"
1967                                )),
1968                                delta: None,
1969                                token_kind: None,
1970                                is_terminal_node_token: None,
1971                                elapsed_ms: None,
1972                                metadata: None,
1973                            });
1974                        }
1975                    }
1976
1977                    Ok(YamlLlmExecutionResult {
1978                        payload: resolved.payload,
1979                        usage: final_stream_usage.map(|usage| YamlLlmTokenUsage {
1980                            prompt_tokens: usage.prompt_tokens,
1981                            completion_tokens: usage.completion_tokens,
1982                            total_tokens: usage.total_tokens,
1983                            thinking_tokens: None,
1984                        }),
1985                        ttft_ms,
1986                        tool_calls: Vec::new(),
1987                    })
1988                }
1989                CompletionOutcome::Response(response) => {
1990                    let content = response
1991                        .content()
1992                        .ok_or_else(|| "completion returned empty content".to_string())?;
1993                    let payload = serde_json::from_str(content).map_err(|error| {
1994                        format!("failed to parse structured completion JSON: {error}")
1995                    })?;
1996
1997                    Ok(YamlLlmExecutionResult {
1998                        payload,
1999                        usage: Some(YamlLlmTokenUsage {
2000                            prompt_tokens: response.usage.prompt_tokens,
2001                            completion_tokens: response.usage.completion_tokens,
2002                            total_tokens: response.usage.total_tokens,
2003                            thinking_tokens: None,
2004                        }),
2005                        ttft_ms: None,
2006                        tool_calls: Vec::new(),
2007                    })
2008                }
2009                CompletionOutcome::HealedJson(healed) => {
2010                    if let Some(sink) = event_sink {
2011                        sink.emit(&YamlWorkflowEvent {
2012                            event_type: "node_healed".to_string(),
2013                            node_id: Some(request.node_id.clone()),
2014                            step_id: Some(request.node_id.clone()),
2015                            node_kind: Some("llm_call".to_string()),
2016                            streamable: Some(request.stream),
2017                            message: Some(format!(
2018                                "healed structured response confidence={}",
2019                                healed.parsed.confidence
2020                            )),
2021                            delta: None,
2022                            token_kind: None,
2023                            is_terminal_node_token: None,
2024                            elapsed_ms: None,
2025                            metadata: None,
2026                        });
2027                    }
2028                    Ok(YamlLlmExecutionResult {
2029                        payload: healed.parsed.value,
2030                        usage: Some(YamlLlmTokenUsage {
2031                            prompt_tokens: healed.response.usage.prompt_tokens,
2032                            completion_tokens: healed.response.usage.completion_tokens,
2033                            total_tokens: healed.response.usage.total_tokens,
2034                            thinking_tokens: None,
2035                        }),
2036                        ttft_ms: None,
2037                        tool_calls: Vec::new(),
2038                    })
2039                }
2040                CompletionOutcome::CoercedSchema(coerced) => Ok(YamlLlmExecutionResult {
2041                    payload: coerced.coerced.value,
2042                    usage: Some(YamlLlmTokenUsage {
2043                        prompt_tokens: coerced.response.usage.prompt_tokens,
2044                        completion_tokens: coerced.response.usage.completion_tokens,
2045                        total_tokens: coerced.response.usage.total_tokens,
2046                        thinking_tokens: None,
2047                    }),
2048                    ttft_ms: None,
2049                    tool_calls: Vec::new(),
2050                }),
2051            }
2052        }
2053    }
2054
2055    let executor = BorrowedClientExecutor {
2056        client,
2057        custom_worker,
2058    };
2059    run_workflow_yaml_with_custom_worker_and_events_and_options(
2060        workflow,
2061        workflow_input,
2062        &executor,
2063        custom_worker,
2064        event_sink,
2065        options,
2066    )
2067    .await
2068}
2069
2070pub async fn run_email_workflow_yaml_with_client_and_custom_worker_and_events(
2071    workflow: &YamlWorkflow,
2072    email_text: &str,
2073    client: &SimpleAgentsClient,
2074    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2075    event_sink: Option<&dyn YamlWorkflowEventSink>,
2076) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2077    let workflow_input = json!({ "email_text": email_text });
2078    run_workflow_yaml_with_client_and_custom_worker_and_events(
2079        workflow,
2080        &workflow_input,
2081        client,
2082        custom_worker,
2083        event_sink,
2084    )
2085    .await
2086}
2087
2088pub async fn run_workflow_yaml(
2089    workflow: &YamlWorkflow,
2090    workflow_input: &Value,
2091    executor: &dyn YamlWorkflowLlmExecutor,
2092) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2093    run_workflow_yaml_with_custom_worker_and_events(workflow, workflow_input, executor, None, None)
2094        .await
2095}
2096
2097pub async fn run_email_workflow_yaml(
2098    workflow: &YamlWorkflow,
2099    email_text: &str,
2100    executor: &dyn YamlWorkflowLlmExecutor,
2101) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2102    let workflow_input = json!({ "email_text": email_text });
2103    run_workflow_yaml(workflow, &workflow_input, executor).await
2104}
2105
2106pub async fn run_workflow_yaml_with_custom_worker(
2107    workflow: &YamlWorkflow,
2108    workflow_input: &Value,
2109    executor: &dyn YamlWorkflowLlmExecutor,
2110    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2111) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2112    run_workflow_yaml_with_custom_worker_and_events(
2113        workflow,
2114        workflow_input,
2115        executor,
2116        custom_worker,
2117        None,
2118    )
2119    .await
2120}
2121
2122pub async fn run_email_workflow_yaml_with_custom_worker(
2123    workflow: &YamlWorkflow,
2124    email_text: &str,
2125    executor: &dyn YamlWorkflowLlmExecutor,
2126    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2127) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2128    let workflow_input = json!({ "email_text": email_text });
2129    run_workflow_yaml_with_custom_worker(workflow, &workflow_input, executor, custom_worker).await
2130}
2131
2132pub async fn run_workflow_yaml_with_custom_worker_and_events(
2133    workflow: &YamlWorkflow,
2134    workflow_input: &Value,
2135    executor: &dyn YamlWorkflowLlmExecutor,
2136    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2137    event_sink: Option<&dyn YamlWorkflowEventSink>,
2138) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2139    run_workflow_yaml_with_custom_worker_and_events_and_options(
2140        workflow,
2141        workflow_input,
2142        executor,
2143        custom_worker,
2144        event_sink,
2145        &YamlWorkflowRunOptions::default(),
2146    )
2147    .await
2148}
2149
2150pub async fn run_workflow_yaml_with_custom_worker_and_events_and_options(
2151    workflow: &YamlWorkflow,
2152    workflow_input: &Value,
2153    executor: &dyn YamlWorkflowLlmExecutor,
2154    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2155    event_sink: Option<&dyn YamlWorkflowEventSink>,
2156    options: &YamlWorkflowRunOptions,
2157) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2158    if !workflow_input.is_object() {
2159        return Err(YamlWorkflowRunError::InvalidInput {
2160            message: "workflow input must be a JSON object".to_string(),
2161        });
2162    }
2163
2164    let email_text = workflow_input
2165        .get("email_text")
2166        .and_then(Value::as_str)
2167        .unwrap_or_default();
2168
2169    let diagnostics = verify_yaml_workflow(workflow);
2170    let errors: Vec<YamlWorkflowDiagnostic> = diagnostics
2171        .iter()
2172        .filter(|d| d.severity == YamlWorkflowDiagnosticSeverity::Error)
2173        .cloned()
2174        .collect();
2175    if !errors.is_empty() {
2176        return Err(YamlWorkflowRunError::Validation {
2177            diagnostics_count: errors.len(),
2178            diagnostics: errors,
2179        });
2180    }
2181
2182    if let Some(output) =
2183        try_run_yaml_via_ir_runtime(workflow, workflow_input, executor, custom_worker, options)
2184            .await?
2185    {
2186        return Ok(output);
2187    }
2188
2189    let tracer = workflow_tracer();
2190    let parent_trace_context = trace_context_from_options(options);
2191    let (workflow_trace_context, mut workflow_span) = tracer.start_span(
2192        "workflow.run",
2193        SpanKind::Workflow,
2194        parent_trace_context.as_ref(),
2195    );
2196    let trace_id = if options.telemetry.enabled {
2197        let value = resolve_trace_id(options, &workflow_trace_context);
2198        workflow_span.set_attribute("trace_id", value.as_str());
2199        apply_trace_tenant_attributes(workflow_span.as_mut(), options);
2200        Some(value)
2201    } else {
2202        None
2203    };
2204
2205    if workflow.nodes.is_empty() {
2206        return Err(YamlWorkflowRunError::EmptyNodes {
2207            workflow_id: workflow.id.clone(),
2208        });
2209    }
2210
2211    let node_map: HashMap<&str, &YamlNode> = workflow
2212        .nodes
2213        .iter()
2214        .map(|node| (node.id.as_str(), node))
2215        .collect();
2216    if !node_map.contains_key(workflow.entry_node.as_str()) {
2217        return Err(YamlWorkflowRunError::MissingEntry {
2218            entry_node: workflow.entry_node.clone(),
2219        });
2220    }
2221
2222    let edge_map: HashMap<&str, &str> = workflow
2223        .edges
2224        .iter()
2225        .map(|edge| (edge.from.as_str(), edge.to.as_str()))
2226        .collect();
2227
2228    let mut current = workflow.entry_node.clone();
2229    let mut trace = Vec::new();
2230    let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
2231    let mut globals = serde_json::Map::new();
2232    let mut step_timings = Vec::new();
2233    let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
2234    let mut token_totals = YamlTokenTotals::default();
2235    let mut workflow_ttft_ms: Option<u128> = None;
2236    let started = Instant::now();
2237
2238    if let Some(sink) = event_sink {
2239        sink.emit(&YamlWorkflowEvent {
2240            event_type: "workflow_started".to_string(),
2241            node_id: None,
2242            step_id: None,
2243            node_kind: None,
2244            streamable: None,
2245            message: Some(format!("workflow_id={}", workflow.id)),
2246            delta: None,
2247            token_kind: None,
2248            is_terminal_node_token: None,
2249            elapsed_ms: Some(0),
2250            metadata: None,
2251        });
2252    }
2253
2254    if event_sink_is_cancelled(event_sink) {
2255        return Err(YamlWorkflowRunError::EventSinkCancelled {
2256            message: workflow_event_sink_cancelled_message().to_string(),
2257        });
2258    }
2259
2260    loop {
2261        if event_sink_is_cancelled(event_sink) {
2262            return Err(YamlWorkflowRunError::EventSinkCancelled {
2263                message: workflow_event_sink_cancelled_message().to_string(),
2264            });
2265        }
2266
2267        let node =
2268            *node_map
2269                .get(current.as_str())
2270                .ok_or_else(|| YamlWorkflowRunError::MissingNode {
2271                    node_id: current.clone(),
2272                })?;
2273
2274        trace.push(node.id.clone());
2275        let step_started = Instant::now();
2276
2277        let mut node_span = if options.telemetry.enabled {
2278            let (_, mut span) = tracer.start_span(
2279                "workflow.node.execute",
2280                SpanKind::Node,
2281                Some(&workflow_trace_context),
2282            );
2283            span.set_attribute("trace_id", trace_id.as_deref().unwrap_or_default());
2284            span.set_attribute("node_id", node.id.as_str());
2285            span.set_attribute("node_kind", node.kind_name());
2286            Some(span)
2287        } else {
2288            None
2289        };
2290
2291        let node_streamable = node
2292            .node_type
2293            .llm_call
2294            .as_ref()
2295            .map(|llm| llm.stream.unwrap_or(false) && !llm.heal.unwrap_or(false));
2296        let workflow_elapsed_before_node_ms = started.elapsed().as_millis();
2297
2298        if let Some(sink) = event_sink {
2299            sink.emit(&YamlWorkflowEvent {
2300                event_type: "node_started".to_string(),
2301                node_id: Some(node.id.clone()),
2302                step_id: Some(node.id.clone()),
2303                node_kind: Some(node.kind_name().to_string()),
2304                streamable: node_streamable,
2305                message: if node_streamable == Some(false) {
2306                    Some("Node is not streamable; status events only".to_string())
2307                } else {
2308                    None
2309                },
2310                delta: None,
2311                token_kind: None,
2312                is_terminal_node_token: None,
2313                elapsed_ms: Some(workflow_elapsed_before_node_ms),
2314                metadata: None,
2315            });
2316        }
2317
2318        if event_sink_is_cancelled(event_sink) {
2319            return Err(YamlWorkflowRunError::EventSinkCancelled {
2320                message: workflow_event_sink_cancelled_message().to_string(),
2321            });
2322        }
2323
2324        let mut node_usage: Option<YamlLlmTokenUsage> = None;
2325        let is_terminal_node = !edge_map.contains_key(node.id.as_str());
2326        let next = if let Some(llm) = &node.node_type.llm_call {
2327            let prompt_template = node
2328                .config
2329                .as_ref()
2330                .and_then(|cfg| cfg.prompt.as_deref())
2331                .unwrap_or_default();
2332            let context = json!({
2333                "input": workflow_input,
2334                "nodes": outputs,
2335                "globals": Value::Object(globals.clone())
2336            });
2337            let messages = if let Some(path) = llm.messages_path.as_deref() {
2338                Some(
2339                    parse_messages_from_context(path, &context).map_err(|message| {
2340                        YamlWorkflowRunError::Llm {
2341                            node_id: node.id.clone(),
2342                            message,
2343                        }
2344                    })?,
2345                )
2346            } else {
2347                None
2348            };
2349            let prompt_bindings = collect_template_bindings(prompt_template, &context);
2350            let prompt = interpolate_template(prompt_template, &context);
2351            let schema = llm_output_schema_for_node(node);
2352
2353            let request = YamlLlmExecutionRequest {
2354                node_id: node.id.clone(),
2355                is_terminal_node,
2356                stream_json_as_text: llm.stream_json_as_text.unwrap_or(false),
2357                model: llm.model.clone(),
2358                messages,
2359                append_prompt_as_user: llm.append_prompt_as_user.unwrap_or(true),
2360                prompt,
2361                prompt_template: prompt_template.to_string(),
2362                prompt_bindings,
2363                schema,
2364                stream: llm.stream.unwrap_or(false),
2365                heal: llm.heal.unwrap_or(false),
2366                tools: normalize_llm_tools(llm).map_err(|message| YamlWorkflowRunError::Llm {
2367                    node_id: node.id.clone(),
2368                    message,
2369                })?,
2370                tool_choice: normalize_tool_choice(llm.tool_choice.clone()).map_err(|message| {
2371                    YamlWorkflowRunError::Llm {
2372                        node_id: node.id.clone(),
2373                        message,
2374                    }
2375                })?,
2376                max_tool_roundtrips: llm.max_tool_roundtrips.unwrap_or(1),
2377                tool_calls_global_key: llm.tool_calls_global_key.clone(),
2378                tool_trace_mode: options.telemetry.tool_trace_mode,
2379                execution_context: context.clone(),
2380                email_text: email_text.to_string(),
2381            };
2382
2383            if let Some(span) = node_span.as_mut() {
2384                span.set_attribute(
2385                    "node_input",
2386                    payload_for_span(options.telemetry.payload_mode, &context).as_str(),
2387                );
2388            }
2389
2390            if let Some(sink) = event_sink {
2391                sink.emit(&YamlWorkflowEvent {
2392                    event_type: "node_llm_input_resolved".to_string(),
2393                    node_id: Some(node.id.clone()),
2394                    step_id: Some(node.id.clone()),
2395                    node_kind: Some("llm_call".to_string()),
2396                    streamable: Some(request.stream),
2397                    message: Some("resolved llm input for telemetry".to_string()),
2398                    delta: None,
2399                    token_kind: None,
2400                    is_terminal_node_token: None,
2401                    elapsed_ms: Some(started.elapsed().as_millis()),
2402                    metadata: Some(json!({
2403                        "model": request.model.clone(),
2404                        "stream_requested": request.stream,
2405                        "stream_json_as_text": request.stream_json_as_text,
2406                        "heal_requested": request.heal,
2407                        "effective_stream": request.stream,
2408                        "prompt_template": request.prompt_template.clone(),
2409                        "prompt": request.prompt.clone(),
2410                        "schema": request.schema.clone(),
2411                        "bindings": request.prompt_bindings.clone(),
2412                        "tools_count": request.tools.len(),
2413                        "max_tool_roundtrips": request.max_tool_roundtrips,
2414                    })),
2415                });
2416            }
2417
2418            if event_sink_is_cancelled(event_sink) {
2419                return Err(YamlWorkflowRunError::EventSinkCancelled {
2420                    message: workflow_event_sink_cancelled_message().to_string(),
2421                });
2422            }
2423
2424            let llm_result = executor
2425                .complete_structured(request, event_sink)
2426                .await
2427                .map_err(|message| YamlWorkflowRunError::Llm {
2428                    node_id: node.id.clone(),
2429                    message,
2430                })?;
2431
2432            if let Some(usage) = llm_result.usage.as_ref() {
2433                token_totals.add_usage(usage);
2434            }
2435            if workflow_ttft_ms.is_none() {
2436                workflow_ttft_ms = llm_result
2437                    .ttft_ms
2438                    .map(|node_ttft_ms| workflow_elapsed_before_node_ms + node_ttft_ms);
2439            }
2440            node_usage = llm_result.usage;
2441
2442            let payload = llm_result.payload;
2443            let tool_calls = llm_result.tool_calls;
2444
2445            if !payload.is_object() {
2446                return Err(YamlWorkflowRunError::LlmPayloadNotObject {
2447                    node_id: node.id.clone(),
2448                });
2449            }
2450
2451            let mut node_output = json!({ "output": payload });
2452            if !tool_calls.is_empty() {
2453                if let Some(output_obj) = node_output.as_object_mut() {
2454                    output_obj.insert("tool_calls".to_string(), json!(tool_calls));
2455                }
2456            }
2457            outputs.insert(node.id.clone(), node_output);
2458            if let Some(span) = node_span.as_mut() {
2459                if let Some(output_payload) = outputs.get(node.id.as_str()) {
2460                    span.set_attribute(
2461                        "node_output",
2462                        payload_for_span(options.telemetry.payload_mode, output_payload).as_str(),
2463                    );
2464                }
2465            }
2466            apply_set_globals(node, &outputs, workflow_input, &mut globals);
2467            apply_update_globals(node, &outputs, workflow_input, &mut globals);
2468            if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
2469                if let Some(node_tool_calls) = outputs
2470                    .get(node.id.as_str())
2471                    .and_then(|value| value.get("tool_calls"))
2472                    .cloned()
2473                {
2474                    globals.insert(global_key.clone(), node_tool_calls);
2475                }
2476            }
2477            edge_map
2478                .get(node.id.as_str())
2479                .map(|value| value.to_string())
2480        } else if let Some(switch) = &node.node_type.switch {
2481            let context = json!({
2482                "input": workflow_input,
2483                "nodes": outputs,
2484                "globals": Value::Object(globals.clone())
2485            });
2486            let mut chosen = Some(switch.default.clone());
2487            for branch in &switch.branches {
2488                if evaluate_switch_condition(branch.condition.as_str(), &context)? {
2489                    chosen = Some(branch.target.clone());
2490                    break;
2491                }
2492            }
2493            let chosen = chosen.ok_or_else(|| YamlWorkflowRunError::InvalidSwitchTarget {
2494                node_id: node.id.clone(),
2495            })?;
2496            Some(chosen)
2497        } else if let Some(custom) = &node.node_type.custom_worker {
2498            let payload = node
2499                .config
2500                .as_ref()
2501                .and_then(|cfg| cfg.payload.as_ref())
2502                .cloned()
2503                .unwrap_or_else(|| json!({}));
2504            let context = json!({
2505                "input": workflow_input,
2506                "nodes": outputs,
2507                "globals": Value::Object(globals.clone())
2508            });
2509
2510            if let Some(span) = node_span.as_mut() {
2511                span.set_attribute("handler_name", custom.handler.as_str());
2512                span.set_attribute(
2513                    "node_input",
2514                    payload_for_span(options.telemetry.payload_mode, &payload).as_str(),
2515                );
2516            }
2517
2518            let mut handler_span_context: Option<TraceContext> = None;
2519            let mut handler_span = if options.telemetry.enabled {
2520                let (span_context, mut span) = tracer.start_span(
2521                    "handler.invoke",
2522                    SpanKind::Node,
2523                    Some(&workflow_trace_context),
2524                );
2525                handler_span_context = Some(span_context);
2526                span.set_attribute("trace_id", trace_id.as_deref().unwrap_or_default());
2527                span.set_attribute("handler_name", custom.handler.as_str());
2528                apply_trace_tenant_attributes(span.as_mut(), options);
2529                Some(span)
2530            } else {
2531                None
2532            };
2533
2534            let worker_trace_context = merged_trace_context_for_worker(
2535                handler_span_context.as_ref(),
2536                trace_id.as_deref(),
2537                options,
2538            );
2539            let worker_context = custom_worker_context_with_trace(
2540                &context,
2541                &worker_trace_context,
2542                &options.trace.tenant,
2543            );
2544
2545            let worker_output_result = if let Some(custom_worker_executor) = custom_worker {
2546                custom_worker_executor
2547                    .execute(
2548                        custom.handler.as_str(),
2549                        &payload,
2550                        email_text,
2551                        &worker_context,
2552                    )
2553                    .await
2554                    .map_err(|message| YamlWorkflowRunError::CustomWorker {
2555                        node_id: node.id.clone(),
2556                        message,
2557                    })
2558            } else {
2559                mock_custom_worker_output(custom.handler.as_str(), &payload)
2560            };
2561
2562            if let Some(span) = handler_span.take() {
2563                span.end();
2564            }
2565
2566            let worker_output = worker_output_result?;
2567
2568            outputs.insert(node.id.clone(), json!({ "output": worker_output }));
2569            if let Some(span) = node_span.as_mut() {
2570                if let Some(output_payload) = outputs.get(node.id.as_str()) {
2571                    span.set_attribute(
2572                        "node_output",
2573                        payload_for_span(options.telemetry.payload_mode, output_payload).as_str(),
2574                    );
2575                }
2576            }
2577            apply_set_globals(node, &outputs, workflow_input, &mut globals);
2578            apply_update_globals(node, &outputs, workflow_input, &mut globals);
2579            edge_map
2580                .get(node.id.as_str())
2581                .map(|value| value.to_string())
2582        } else {
2583            return Err(YamlWorkflowRunError::UnsupportedNodeType {
2584                node_id: node.id.clone(),
2585            });
2586        };
2587
2588        let node_kind = node.kind_name().to_string();
2589        let elapsed_ms = step_started.elapsed().as_millis();
2590        step_timings.push(YamlStepTiming {
2591            node_id: node.id.clone(),
2592            node_kind,
2593            elapsed_ms,
2594            prompt_tokens: node_usage.as_ref().map(|usage| usage.prompt_tokens),
2595            completion_tokens: node_usage.as_ref().map(|usage| usage.completion_tokens),
2596            total_tokens: node_usage.as_ref().map(|usage| usage.total_tokens),
2597            thinking_tokens: node_usage.as_ref().and_then(|usage| usage.thinking_tokens),
2598            tokens_per_second: node_usage
2599                .as_ref()
2600                .map(|usage| completion_tokens_per_second(usage.completion_tokens, elapsed_ms)),
2601        });
2602
2603        if let Some(usage) = node_usage.as_ref() {
2604            llm_node_metrics.insert(
2605                node.id.clone(),
2606                YamlLlmNodeMetrics {
2607                    elapsed_ms,
2608                    prompt_tokens: usage.prompt_tokens,
2609                    completion_tokens: usage.completion_tokens,
2610                    total_tokens: usage.total_tokens,
2611                    thinking_tokens: usage.thinking_tokens,
2612                    tokens_per_second: completion_tokens_per_second(
2613                        usage.completion_tokens,
2614                        elapsed_ms,
2615                    ),
2616                },
2617            );
2618        }
2619
2620        if let Some(mut span) = node_span.take() {
2621            span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
2622            span.add_event("node_completed");
2623            span.end();
2624        }
2625
2626        if let Some(sink) = event_sink {
2627            sink.emit(&YamlWorkflowEvent {
2628                event_type: "node_completed".to_string(),
2629                node_id: Some(node.id.clone()),
2630                step_id: Some(node.id.clone()),
2631                node_kind: Some(node.kind_name().to_string()),
2632                streamable: node_streamable,
2633                message: None,
2634                delta: None,
2635                token_kind: None,
2636                is_terminal_node_token: None,
2637                elapsed_ms: Some(elapsed_ms),
2638                metadata: None,
2639            });
2640        }
2641
2642        if event_sink_is_cancelled(event_sink) {
2643            return Err(YamlWorkflowRunError::EventSinkCancelled {
2644                message: workflow_event_sink_cancelled_message().to_string(),
2645            });
2646        }
2647
2648        if let Some(next) = next {
2649            current = next;
2650            continue;
2651        }
2652        break;
2653    }
2654
2655    let terminal_node = trace
2656        .last()
2657        .cloned()
2658        .ok_or_else(|| YamlWorkflowRunError::EmptyNodes {
2659            workflow_id: workflow.id.clone(),
2660        })?;
2661
2662    let terminal_output = outputs
2663        .get(terminal_node.as_str())
2664        .and_then(|value| value.get("output"))
2665        .cloned();
2666
2667    let total_elapsed_ms = started.elapsed().as_millis();
2668    let output = YamlWorkflowRunOutput {
2669        workflow_id: workflow.id.clone(),
2670        entry_node: workflow.entry_node.clone(),
2671        email_text: email_text.to_string(),
2672        trace,
2673        outputs,
2674        terminal_node,
2675        terminal_output,
2676        step_timings,
2677        llm_node_metrics,
2678        total_elapsed_ms,
2679        ttft_ms: workflow_ttft_ms,
2680        total_input_tokens: token_totals.input_tokens,
2681        total_output_tokens: token_totals.output_tokens,
2682        total_tokens: token_totals.total_tokens,
2683        total_thinking_tokens: token_totals.thinking_tokens,
2684        tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
2685        trace_id: trace_id.clone(),
2686        metadata: trace_id
2687            .as_ref()
2688            .map(|value| workflow_metadata_with_trace(options, value)),
2689    };
2690
2691    if let Some(sink) = event_sink {
2692        let event_metadata = if options.telemetry.nerdstats {
2693            Some(json!({
2694                "nerdstats": workflow_nerdstats(&output),
2695            }))
2696        } else {
2697            None
2698        };
2699        sink.emit(&YamlWorkflowEvent {
2700            event_type: "workflow_completed".to_string(),
2701            node_id: None,
2702            step_id: None,
2703            node_kind: None,
2704            streamable: None,
2705            message: Some(format!("terminal_node={}", output.terminal_node)),
2706            delta: None,
2707            token_kind: None,
2708            is_terminal_node_token: None,
2709            elapsed_ms: Some(output.total_elapsed_ms),
2710            metadata: event_metadata,
2711        });
2712    }
2713
2714    if event_sink_is_cancelled(event_sink) {
2715        return Err(YamlWorkflowRunError::EventSinkCancelled {
2716            message: workflow_event_sink_cancelled_message().to_string(),
2717        });
2718    }
2719
2720    workflow_span.set_attribute("workflow_id", workflow.id.as_str());
2721    if let Some(trace_id_value) = trace_id.as_ref() {
2722        workflow_span.set_attribute("trace_id", trace_id_value.as_str());
2723    }
2724    workflow_span.end();
2725
2726    Ok(output)
2727}
2728
2729async fn try_run_yaml_via_ir_runtime(
2730    workflow: &YamlWorkflow,
2731    workflow_input: &Value,
2732    executor: &dyn YamlWorkflowLlmExecutor,
2733    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2734    options: &YamlWorkflowRunOptions,
2735) -> Result<Option<YamlWorkflowRunOutput>, YamlWorkflowRunError> {
2736    let ir = match yaml_workflow_to_ir(workflow) {
2737        Ok(def) => def,
2738        Err(YamlToIrError::UnsupportedNode { .. })
2739        | Err(YamlToIrError::MultipleOutgoingEdge { .. }) => return Ok(None),
2740        Err(err) => {
2741            return Err(YamlWorkflowRunError::InvalidInput {
2742                message: err.to_string(),
2743            });
2744        }
2745    };
2746
2747    let tracer = workflow_tracer();
2748    let parent_trace_context = trace_context_from_options(options);
2749    let (workflow_trace_context, mut workflow_span) = tracer.start_span(
2750        "workflow.run",
2751        SpanKind::Workflow,
2752        parent_trace_context.as_ref(),
2753    );
2754    let trace_id = if options.telemetry.enabled {
2755        let value = resolve_trace_id(options, &workflow_trace_context);
2756        workflow_span.set_attribute("trace_id", value.as_str());
2757        apply_trace_tenant_attributes(workflow_span.as_mut(), options);
2758        Some(value)
2759    } else {
2760        None
2761    };
2762
2763    struct NoopLlm;
2764    #[async_trait]
2765    impl LlmExecutor for NoopLlm {
2766        async fn execute(
2767            &self,
2768            _input: LlmExecutionInput,
2769        ) -> Result<LlmExecutionOutput, LlmExecutionError> {
2770            Err(LlmExecutionError::UnexpectedOutcome(
2771                "yaml_ir_uses_tool_path",
2772            ))
2773        }
2774    }
2775
2776    struct YamlIrToolExecutor<'a> {
2777        llm_executor: &'a dyn YamlWorkflowLlmExecutor,
2778        custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
2779        token_totals: std::sync::Mutex<YamlTokenTotals>,
2780        node_usage: std::sync::Mutex<BTreeMap<String, YamlLlmTokenUsage>>,
2781        trace_id: Option<String>,
2782        trace_context: Option<TraceContext>,
2783        trace_input_context: Option<YamlWorkflowTraceContextInput>,
2784        tenant_context: YamlWorkflowTraceTenantContext,
2785        payload_mode: YamlWorkflowPayloadMode,
2786    }
2787
2788    #[async_trait]
2789    impl ToolExecutor for YamlIrToolExecutor<'_> {
2790        async fn execute_tool(
2791            &self,
2792            input: ToolExecutionInput,
2793        ) -> Result<Value, ToolExecutionError> {
2794            let context = build_yaml_context_from_ir_scope(&input.scoped_input);
2795
2796            if input.tool == YAML_LLM_TOOL_ID {
2797                let node_id = input
2798                    .input
2799                    .get("node_id")
2800                    .and_then(Value::as_str)
2801                    .ok_or_else(|| {
2802                        ToolExecutionError::Failed("yaml llm call missing node_id".to_string())
2803                    })?
2804                    .to_string();
2805                let node_id_for_metrics = node_id.clone();
2806                let model = input
2807                    .input
2808                    .get("model")
2809                    .and_then(Value::as_str)
2810                    .ok_or_else(|| {
2811                        ToolExecutionError::Failed("yaml llm call missing model".to_string())
2812                    })?
2813                    .to_string();
2814                let prompt_template = input
2815                    .input
2816                    .get("prompt_template")
2817                    .and_then(Value::as_str)
2818                    .unwrap_or_default()
2819                    .to_string();
2820                let stream = input
2821                    .input
2822                    .get("stream")
2823                    .and_then(Value::as_bool)
2824                    .unwrap_or(false);
2825                let heal = input
2826                    .input
2827                    .get("heal")
2828                    .and_then(Value::as_bool)
2829                    .unwrap_or(false);
2830                let append_prompt_as_user = input
2831                    .input
2832                    .get("append_prompt_as_user")
2833                    .and_then(Value::as_bool)
2834                    .unwrap_or(true);
2835                let messages_path = input
2836                    .input
2837                    .get("messages_path")
2838                    .and_then(Value::as_str)
2839                    .map(str::to_string);
2840
2841                let messages = if let Some(path) = messages_path.as_deref() {
2842                    Some(
2843                        parse_messages_from_context(path, &context)
2844                            .map_err(ToolExecutionError::Failed)?,
2845                    )
2846                } else {
2847                    None
2848                };
2849
2850                let prompt_bindings = collect_template_bindings(&prompt_template, &context);
2851                let prompt = interpolate_template(&prompt_template, &context);
2852                let email_text = context
2853                    .get("input")
2854                    .and_then(|v| v.get("email_text"))
2855                    .and_then(Value::as_str)
2856                    .unwrap_or_default();
2857                let schema = input
2858                    .input
2859                    .get("output_schema")
2860                    .cloned()
2861                    .unwrap_or_else(default_llm_output_schema);
2862
2863                let request = YamlLlmExecutionRequest {
2864                    node_id,
2865                    is_terminal_node: false,
2866                    stream_json_as_text: input
2867                        .input
2868                        .get("stream_json_as_text")
2869                        .and_then(Value::as_bool)
2870                        .unwrap_or(false),
2871                    model,
2872                    messages,
2873                    append_prompt_as_user,
2874                    prompt,
2875                    prompt_template,
2876                    prompt_bindings,
2877                    schema,
2878                    stream,
2879                    heal,
2880                    tools: Vec::new(),
2881                    tool_choice: None,
2882                    max_tool_roundtrips: 1,
2883                    tool_calls_global_key: None,
2884                    tool_trace_mode: YamlToolTraceMode::Off,
2885                    execution_context: context.clone(),
2886                    email_text: email_text.to_string(),
2887                };
2888
2889                let llm_result = self
2890                    .llm_executor
2891                    .complete_structured(request, None)
2892                    .await
2893                    .map_err(ToolExecutionError::Failed);
2894
2895                if let Ok(ref result) = llm_result {
2896                    if let Some(usage) = result.usage.as_ref() {
2897                        if let Ok(mut totals) = self.token_totals.lock() {
2898                            totals.add_usage(usage);
2899                        }
2900                        if let Ok(mut usage_map) = self.node_usage.lock() {
2901                            usage_map.insert(node_id_for_metrics, usage.clone());
2902                        }
2903                    }
2904                }
2905
2906                return llm_result.map(|result| result.payload);
2907            }
2908
2909            let worker = self
2910                .custom_worker
2911                .ok_or_else(|| ToolExecutionError::NotFound {
2912                    tool: input.tool.clone(),
2913                })?;
2914
2915            let payload = input.input.clone();
2916            let email_text = context
2917                .get("input")
2918                .and_then(|v| v.get("email_text"))
2919                .and_then(Value::as_str)
2920                .unwrap_or_default();
2921
2922            let tracer = workflow_tracer();
2923            let mut handler_span_context: Option<TraceContext> = None;
2924            let mut handler_span = if self.trace_id.is_some() {
2925                let (span_context, mut span) = tracer.start_span(
2926                    "handler.invoke",
2927                    SpanKind::Node,
2928                    self.trace_context.as_ref(),
2929                );
2930                handler_span_context = Some(span_context);
2931                if let Some(trace_id) = self.trace_id.as_ref() {
2932                    span.set_attribute("trace_id", trace_id.as_str());
2933                }
2934                span.set_attribute("handler_name", input.tool.as_str());
2935                if let Some(workspace_id) = self.tenant_context.workspace_id.as_deref() {
2936                    span.set_attribute("tenant.workspace_id", workspace_id);
2937                }
2938                if let Some(user_id) = self.tenant_context.user_id.as_deref() {
2939                    span.set_attribute("tenant.user_id", user_id);
2940                }
2941                if let Some(conversation_id) = self.tenant_context.conversation_id.as_deref() {
2942                    span.set_attribute("tenant.conversation_id", conversation_id);
2943                }
2944                if let Some(request_id) = self.tenant_context.request_id.as_deref() {
2945                    span.set_attribute("tenant.request_id", request_id);
2946                }
2947                if let Some(run_id) = self.tenant_context.run_id.as_deref() {
2948                    span.set_attribute("tenant.run_id", run_id);
2949                }
2950                span.set_attribute(
2951                    "node_input",
2952                    payload_for_span(self.payload_mode, &payload).as_str(),
2953                );
2954                Some(span)
2955            } else {
2956                None
2957            };
2958
2959            let trace_options = YamlWorkflowRunOptions {
2960                telemetry: YamlWorkflowTelemetryConfig::default(),
2961                trace: YamlWorkflowTraceOptions {
2962                    context: self.trace_input_context.clone(),
2963                    tenant: self.tenant_context.clone(),
2964                },
2965            };
2966            let worker_trace_context = merged_trace_context_for_worker(
2967                handler_span_context.as_ref(),
2968                self.trace_id.as_deref(),
2969                &trace_options,
2970            );
2971            let worker_context = custom_worker_context_with_trace(
2972                &context,
2973                &worker_trace_context,
2974                &self.tenant_context,
2975            );
2976
2977            let output_result = worker
2978                .execute(&input.tool, &payload, email_text, &worker_context)
2979                .await
2980                .map_err(ToolExecutionError::Failed);
2981
2982            if let Some(span) = handler_span.as_mut() {
2983                if output_result.is_ok() {
2984                    span.add_event("handler.success");
2985                } else {
2986                    span.add_event("handler.error");
2987                }
2988            }
2989
2990            if let Some(span) = handler_span.take() {
2991                span.end();
2992            }
2993
2994            output_result
2995        }
2996    }
2997
2998    let tool_executor = YamlIrToolExecutor {
2999        llm_executor: executor,
3000        custom_worker,
3001        token_totals: std::sync::Mutex::new(YamlTokenTotals::default()),
3002        node_usage: std::sync::Mutex::new(BTreeMap::new()),
3003        trace_id: trace_id.clone(),
3004        trace_context: trace_id.as_ref().map(|_| workflow_trace_context.clone()),
3005        trace_input_context: options.trace.context.clone(),
3006        tenant_context: options.trace.tenant.clone(),
3007        payload_mode: options.telemetry.payload_mode,
3008    };
3009
3010    let runtime = WorkflowRuntime::new(
3011        ir,
3012        &NoopLlm,
3013        Some(&tool_executor),
3014        WorkflowRuntimeOptions::default(),
3015    );
3016
3017    let started = Instant::now();
3018    let result = match runtime.execute(workflow_input.clone(), None).await {
3019        Ok(result) => result,
3020        Err(WorkflowRuntimeError::Validation(_)) => return Ok(None),
3021        Err(error) => {
3022            return Err(YamlWorkflowRunError::IrRuntime {
3023                message: error.to_string(),
3024            });
3025        }
3026    };
3027    let total_elapsed_ms = started.elapsed().as_millis();
3028
3029    let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
3030    for (node_id, output) in result.node_outputs {
3031        if node_id == YAML_START_NODE_ID {
3032            continue;
3033        }
3034        outputs.insert(node_id, json!({"output": output}));
3035    }
3036
3037    let mut trace = Vec::new();
3038    let mut step_timings = Vec::new();
3039    let node_usage_map = tool_executor
3040        .node_usage
3041        .lock()
3042        .map(|usage| usage.clone())
3043        .unwrap_or_default();
3044    let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
3045    for execution in result.node_executions {
3046        if execution.node_id == YAML_START_NODE_ID {
3047            continue;
3048        }
3049        trace.push(execution.node_id.clone());
3050        let usage = node_usage_map.get(&execution.node_id);
3051        if let Some(usage) = usage {
3052            llm_node_metrics.insert(
3053                execution.node_id.clone(),
3054                YamlLlmNodeMetrics {
3055                    elapsed_ms: 0,
3056                    prompt_tokens: usage.prompt_tokens,
3057                    completion_tokens: usage.completion_tokens,
3058                    total_tokens: usage.total_tokens,
3059                    thinking_tokens: usage.thinking_tokens,
3060                    tokens_per_second: completion_tokens_per_second(usage.completion_tokens, 0),
3061                },
3062            );
3063        }
3064        step_timings.push(YamlStepTiming {
3065            node_id: execution.node_id,
3066            node_kind: "ir_runtime".to_string(),
3067            elapsed_ms: 0,
3068            prompt_tokens: usage.map(|value| value.prompt_tokens),
3069            completion_tokens: usage.map(|value| value.completion_tokens),
3070            total_tokens: usage.map(|value| value.total_tokens),
3071            thinking_tokens: usage.and_then(|value| value.thinking_tokens),
3072            tokens_per_second: usage
3073                .map(|value| completion_tokens_per_second(value.completion_tokens, 0)),
3074        });
3075    }
3076
3077    let terminal_node = result.terminal_node_id;
3078    let terminal_output = outputs
3079        .get(&terminal_node)
3080        .and_then(|v| v.get("output"))
3081        .cloned();
3082
3083    let email_text = workflow_input
3084        .get("email_text")
3085        .and_then(Value::as_str)
3086        .unwrap_or_default()
3087        .to_string();
3088
3089    let token_totals = tool_executor
3090        .token_totals
3091        .lock()
3092        .map(|totals| totals.clone())
3093        .unwrap_or_default();
3094
3095    workflow_span.set_attribute("workflow_id", workflow.id.as_str());
3096    workflow_span.end();
3097
3098    Ok(Some(YamlWorkflowRunOutput {
3099        workflow_id: workflow.id.clone(),
3100        entry_node: workflow.entry_node.clone(),
3101        email_text,
3102        trace,
3103        outputs,
3104        terminal_node,
3105        terminal_output,
3106        step_timings,
3107        llm_node_metrics,
3108        total_elapsed_ms,
3109        ttft_ms: None,
3110        total_input_tokens: token_totals.input_tokens,
3111        total_output_tokens: token_totals.output_tokens,
3112        total_tokens: token_totals.total_tokens,
3113        total_thinking_tokens: token_totals.thinking_tokens,
3114        tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
3115        trace_id: trace_id.clone(),
3116        metadata: trace_id
3117            .as_ref()
3118            .map(|value| workflow_metadata_with_trace(options, value)),
3119    }))
3120}
3121
3122fn build_yaml_context_from_ir_scope(scoped_input: &Value) -> Value {
3123    let input = scoped_input.get("input").cloned().unwrap_or(Value::Null);
3124
3125    let mut nodes = serde_json::Map::new();
3126    if let Some(node_outputs) = scoped_input.get("node_outputs").and_then(Value::as_object) {
3127        for (node_id, output) in node_outputs {
3128            nodes.insert(node_id.clone(), json!({"output": output.clone()}));
3129        }
3130    }
3131
3132    json!({
3133        "input": input,
3134        "nodes": Value::Object(nodes),
3135        "globals": Value::Object(serde_json::Map::new())
3136    })
3137}
3138
3139pub async fn run_email_workflow_yaml_with_custom_worker_and_events(
3140    workflow: &YamlWorkflow,
3141    email_text: &str,
3142    executor: &dyn YamlWorkflowLlmExecutor,
3143    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3144    event_sink: Option<&dyn YamlWorkflowEventSink>,
3145) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3146    let workflow_input = json!({ "email_text": email_text });
3147    run_workflow_yaml_with_custom_worker_and_events(
3148        workflow,
3149        &workflow_input,
3150        executor,
3151        custom_worker,
3152        event_sink,
3153    )
3154    .await
3155}
3156
3157fn evaluate_switch_condition(
3158    condition: &str,
3159    context: &Value,
3160) -> Result<bool, YamlWorkflowRunError> {
3161    let (left, right) =
3162        condition
3163            .split_once("==")
3164            .ok_or_else(|| YamlWorkflowRunError::UnsupportedCondition {
3165                condition: condition.to_string(),
3166            })?;
3167
3168    let left_path = left.trim().trim_start_matches("$.");
3169    let right_literal = right.trim().trim_matches('"').trim_matches('\'');
3170    let left_value = resolve_path(context, left_path);
3171    Ok(left_value
3172        .and_then(Value::as_str)
3173        .map(|value| value == right_literal)
3174        .unwrap_or(false))
3175}
3176
3177fn parse_messages_from_context(path: &str, context: &Value) -> Result<Vec<Message>, String> {
3178    let normalized_path = path.trim().trim_start_matches("$.");
3179    let value = resolve_path(context, normalized_path)
3180        .ok_or_else(|| format!("messages_path not found: {path}"))?;
3181    let list: Vec<WorkflowMessage> = serde_json::from_value(value.clone()).map_err(|err| {
3182        format!("messages_path must resolve to a list of messages: {path}; {err}")
3183    })?;
3184    if list.is_empty() {
3185        return Err(format!(
3186            "messages_path must not resolve to an empty list: {path}"
3187        ));
3188    }
3189
3190    let mut messages = Vec::with_capacity(list.len());
3191    for (index, item) in list.into_iter().enumerate() {
3192        let mut message = match item.role {
3193            Role::System => Message::system(item.content),
3194            Role::User => Message::user(item.content),
3195            Role::Assistant => Message::assistant(item.content),
3196            Role::Tool => {
3197                let tool_call_id = item
3198                    .tool_call_id
3199                    .ok_or_else(|| format!("tool message at index {index} missing tool_call_id"))?;
3200                Message::tool(item.content, tool_call_id)
3201            }
3202        };
3203
3204        if let Some(name) = item.name {
3205            message = message.with_name(name);
3206        }
3207
3208        messages.push(message);
3209    }
3210
3211    Ok(messages)
3212}
3213
3214pub fn verify_yaml_workflow(workflow: &YamlWorkflow) -> Vec<YamlWorkflowDiagnostic> {
3215    let mut diagnostics = Vec::new();
3216    let known_ids: HashMap<&str, &YamlNode> = workflow
3217        .nodes
3218        .iter()
3219        .map(|node| (node.id.as_str(), node))
3220        .collect();
3221
3222    if !known_ids.contains_key(workflow.entry_node.as_str()) {
3223        diagnostics.push(YamlWorkflowDiagnostic {
3224            node_id: None,
3225            code: "missing_entry".to_string(),
3226            severity: YamlWorkflowDiagnosticSeverity::Error,
3227            message: format!("entry node '{}' does not exist", workflow.entry_node),
3228        });
3229    }
3230
3231    for edge in &workflow.edges {
3232        if !known_ids.contains_key(edge.from.as_str()) {
3233            diagnostics.push(YamlWorkflowDiagnostic {
3234                node_id: Some(edge.from.clone()),
3235                code: "unknown_edge_from".to_string(),
3236                severity: YamlWorkflowDiagnosticSeverity::Error,
3237                message: format!("edge.from '{}' does not exist", edge.from),
3238            });
3239        }
3240        if !known_ids.contains_key(edge.to.as_str()) {
3241            diagnostics.push(YamlWorkflowDiagnostic {
3242                node_id: Some(edge.to.clone()),
3243                code: "unknown_edge_to".to_string(),
3244                severity: YamlWorkflowDiagnosticSeverity::Error,
3245                message: format!("edge.to '{}' does not exist", edge.to),
3246            });
3247        }
3248    }
3249
3250    for node in &workflow.nodes {
3251        if let Some(llm) = &node.node_type.llm_call {
3252            if llm.model.trim().is_empty() {
3253                diagnostics.push(YamlWorkflowDiagnostic {
3254                    node_id: Some(node.id.clone()),
3255                    code: "empty_model".to_string(),
3256                    severity: YamlWorkflowDiagnosticSeverity::Error,
3257                    message: "llm_call.model must not be empty".to_string(),
3258                });
3259            }
3260            if llm.stream.unwrap_or(false) && llm.heal.unwrap_or(false) {
3261                diagnostics.push(YamlWorkflowDiagnostic {
3262                    node_id: Some(node.id.clone()),
3263                    code: "stream_heal_conflict".to_string(),
3264                    severity: YamlWorkflowDiagnosticSeverity::Warning,
3265                    message:
3266                        "llm_call.stream=true with heal=true is not streamable; runtime will disable streaming"
3267                            .to_string(),
3268                });
3269            }
3270
3271            if llm.max_tool_roundtrips.unwrap_or(1) == 0 {
3272                diagnostics.push(YamlWorkflowDiagnostic {
3273                    node_id: Some(node.id.clone()),
3274                    code: "invalid_max_tool_roundtrips".to_string(),
3275                    severity: YamlWorkflowDiagnosticSeverity::Error,
3276                    message: "llm_call.max_tool_roundtrips must be >= 1".to_string(),
3277                });
3278            }
3279
3280            if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
3281                if global_key.trim().is_empty() {
3282                    diagnostics.push(YamlWorkflowDiagnostic {
3283                        node_id: Some(node.id.clone()),
3284                        code: "empty_tool_calls_global_key".to_string(),
3285                        severity: YamlWorkflowDiagnosticSeverity::Error,
3286                        message: "llm_call.tool_calls_global_key must not be empty".to_string(),
3287                    });
3288                }
3289            }
3290
3291            match normalize_tool_choice(llm.tool_choice.clone()) {
3292                Ok(choice) => {
3293                    if let Some(ToolChoice::Tool(choice_tool)) = choice.as_ref() {
3294                        if !llm.tools.iter().any(|tool| match (llm.tools_format, tool) {
3295                            (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
3296                                openai.function.name == choice_tool.function.name
3297                            }
3298                            (
3299                                YamlToolFormat::Simplified,
3300                                YamlToolDeclaration::Simplified(simple),
3301                            ) => simple.name == choice_tool.function.name,
3302                            _ => false,
3303                        }) {
3304                            diagnostics.push(YamlWorkflowDiagnostic {
3305                                node_id: Some(node.id.clone()),
3306                                code: "unknown_tool_choice_function".to_string(),
3307                                severity: YamlWorkflowDiagnosticSeverity::Error,
3308                                message: format!(
3309                                    "llm_call.tool_choice references unknown function '{}'",
3310                                    choice_tool.function.name
3311                                ),
3312                            });
3313                        }
3314                    }
3315                }
3316                Err(message) => {
3317                    diagnostics.push(YamlWorkflowDiagnostic {
3318                        node_id: Some(node.id.clone()),
3319                        code: "invalid_tool_choice".to_string(),
3320                        severity: YamlWorkflowDiagnosticSeverity::Error,
3321                        message,
3322                    });
3323                }
3324            }
3325
3326            let normalized_tools = match normalize_llm_tools(llm) {
3327                Ok(tools) => tools,
3328                Err(message) => {
3329                    diagnostics.push(YamlWorkflowDiagnostic {
3330                        node_id: Some(node.id.clone()),
3331                        code: "invalid_tools_format".to_string(),
3332                        severity: YamlWorkflowDiagnosticSeverity::Error,
3333                        message,
3334                    });
3335                    Vec::new()
3336                }
3337            };
3338
3339            let mut seen_tool_names = HashSet::new();
3340            for tool in &normalized_tools {
3341                let name = tool.definition.function.name.trim();
3342                if name.is_empty() {
3343                    diagnostics.push(YamlWorkflowDiagnostic {
3344                        node_id: Some(node.id.clone()),
3345                        code: "empty_tool_name".to_string(),
3346                        severity: YamlWorkflowDiagnosticSeverity::Error,
3347                        message: "tool function name must not be empty".to_string(),
3348                    });
3349                }
3350                if !seen_tool_names.insert(tool.definition.function.name.clone()) {
3351                    diagnostics.push(YamlWorkflowDiagnostic {
3352                        node_id: Some(node.id.clone()),
3353                        code: "duplicate_tool_name".to_string(),
3354                        severity: YamlWorkflowDiagnosticSeverity::Error,
3355                        message: format!(
3356                            "duplicate tool function name '{}' in node",
3357                            tool.definition.function.name
3358                        ),
3359                    });
3360                }
3361
3362                let schema = tool
3363                    .definition
3364                    .function
3365                    .parameters
3366                    .clone()
3367                    .unwrap_or(Value::Null);
3368                if schema.is_null() {
3369                    diagnostics.push(YamlWorkflowDiagnostic {
3370                        node_id: Some(node.id.clone()),
3371                        code: "missing_tool_input_schema".to_string(),
3372                        severity: YamlWorkflowDiagnosticSeverity::Error,
3373                        message: format!(
3374                            "tool '{}' is missing input schema",
3375                            tool.definition.function.name
3376                        ),
3377                    });
3378                } else if let Err(message) = validate_json_schema(&schema) {
3379                    diagnostics.push(YamlWorkflowDiagnostic {
3380                        node_id: Some(node.id.clone()),
3381                        code: "invalid_tool_input_schema".to_string(),
3382                        severity: YamlWorkflowDiagnosticSeverity::Error,
3383                        message: format!(
3384                            "tool '{}' has invalid input schema: {}",
3385                            tool.definition.function.name, message
3386                        ),
3387                    });
3388                }
3389
3390                if let Some(output_schema) = tool.output_schema.as_ref() {
3391                    if let Err(message) = validate_json_schema(output_schema) {
3392                        diagnostics.push(YamlWorkflowDiagnostic {
3393                            node_id: Some(node.id.clone()),
3394                            code: "invalid_tool_output_schema".to_string(),
3395                            severity: YamlWorkflowDiagnosticSeverity::Error,
3396                            message: format!(
3397                                "tool '{}' has invalid output schema: {}",
3398                                tool.definition.function.name, message
3399                            ),
3400                        });
3401                    }
3402                }
3403            }
3404        }
3405
3406        if let Some(switch) = &node.node_type.switch {
3407            for branch in &switch.branches {
3408                if !known_ids.contains_key(branch.target.as_str()) {
3409                    diagnostics.push(YamlWorkflowDiagnostic {
3410                        node_id: Some(node.id.clone()),
3411                        code: "unknown_switch_target".to_string(),
3412                        severity: YamlWorkflowDiagnosticSeverity::Error,
3413                        message: format!("switch branch target '{}' does not exist", branch.target),
3414                    });
3415                }
3416            }
3417            if !known_ids.contains_key(switch.default.as_str()) {
3418                diagnostics.push(YamlWorkflowDiagnostic {
3419                    node_id: Some(node.id.clone()),
3420                    code: "unknown_switch_default".to_string(),
3421                    severity: YamlWorkflowDiagnosticSeverity::Error,
3422                    message: format!("switch default target '{}' does not exist", switch.default),
3423                });
3424            }
3425        }
3426
3427        if let Some(config) = node.config.as_ref() {
3428            if let Some(update_globals) = config.update_globals.as_ref() {
3429                for (key, update) in update_globals {
3430                    let is_valid_op =
3431                        matches!(update.op.as_str(), "set" | "append" | "increment" | "merge");
3432                    if !is_valid_op {
3433                        diagnostics.push(YamlWorkflowDiagnostic {
3434                            node_id: Some(node.id.clone()),
3435                            code: "unknown_update_op".to_string(),
3436                            severity: YamlWorkflowDiagnosticSeverity::Error,
3437                            message: format!(
3438                                "update_globals key '{}' has unknown op '{}'; expected set|append|increment|merge",
3439                                key, update.op
3440                            ),
3441                        });
3442                    }
3443
3444                    if update.op != "increment" && update.from.is_none() {
3445                        diagnostics.push(YamlWorkflowDiagnostic {
3446                            node_id: Some(node.id.clone()),
3447                            code: "missing_update_from".to_string(),
3448                            severity: YamlWorkflowDiagnosticSeverity::Error,
3449                            message: format!(
3450                                "update_globals key '{}' with op '{}' requires 'from'",
3451                                key, update.op
3452                            ),
3453                        });
3454                    }
3455                }
3456            }
3457        }
3458    }
3459
3460    diagnostics
3461}
3462
3463fn resolve_path<'a>(value: &'a Value, path: &str) -> Option<&'a Value> {
3464    path.split('.')
3465        .filter(|segment| !segment.is_empty())
3466        .try_fold(value, |current, segment| {
3467            if let Ok(index) = segment.parse::<usize>() {
3468                return current.get(index);
3469            }
3470            current.get(segment)
3471        })
3472}
3473
3474fn interpolate_template(template: &str, context: &Value) -> String {
3475    let mut out = String::with_capacity(template.len());
3476    let mut rest = template;
3477
3478    loop {
3479        let Some(start) = rest.find("{{") else {
3480            out.push_str(rest);
3481            break;
3482        };
3483
3484        out.push_str(&rest[..start]);
3485        let after_start = &rest[start + 2..];
3486        let Some(end) = after_start.find("}}") else {
3487            out.push_str(&rest[start..]);
3488            break;
3489        };
3490
3491        let expr = after_start[..end].trim();
3492        let source_path = expr.trim_start_matches("$.");
3493        let replacement = resolve_path(context, source_path)
3494            .map(value_to_template_string)
3495            .unwrap_or_default();
3496        out.push_str(replacement.as_str());
3497
3498        rest = &after_start[end + 2..];
3499    }
3500
3501    out
3502}
3503
3504fn collect_template_bindings(template: &str, context: &Value) -> Vec<YamlTemplateBinding> {
3505    let mut bindings = Vec::new();
3506    let mut rest = template;
3507
3508    loop {
3509        let Some(start) = rest.find("{{") else {
3510            break;
3511        };
3512
3513        let after_start = &rest[start + 2..];
3514        let Some(end) = after_start.find("}}") else {
3515            break;
3516        };
3517
3518        let expr = after_start[..end].trim();
3519        let source_path = expr.trim_start_matches("$.").to_string();
3520        let resolved = resolve_path(context, source_path.as_str()).cloned();
3521        let missing = resolved.is_none();
3522        let resolved_value = resolved.unwrap_or(Value::Null);
3523        bindings.push(YamlTemplateBinding {
3524            index: bindings.len(),
3525            expression: expr.to_string(),
3526            source_path,
3527            resolved_type: json_type_name(&resolved_value).to_string(),
3528            missing,
3529            resolved: resolved_value,
3530        });
3531
3532        rest = &after_start[end + 2..];
3533    }
3534
3535    bindings
3536}
3537
3538fn json_type_name(value: &Value) -> &'static str {
3539    match value {
3540        Value::Null => "null",
3541        Value::Bool(_) => "bool",
3542        Value::Number(_) => "number",
3543        Value::String(_) => "string",
3544        Value::Array(_) => "array",
3545        Value::Object(_) => "object",
3546    }
3547}
3548
3549fn value_to_template_string(value: &Value) -> String {
3550    match value {
3551        Value::Null => String::new(),
3552        Value::Bool(v) => v.to_string(),
3553        Value::Number(v) => v.to_string(),
3554        Value::String(v) => v.clone(),
3555        Value::Array(_) | Value::Object(_) => serde_json::to_string(value).unwrap_or_default(),
3556    }
3557}
3558
3559fn apply_set_globals(
3560    node: &YamlNode,
3561    outputs: &BTreeMap<String, Value>,
3562    workflow_input: &Value,
3563    globals: &mut serde_json::Map<String, Value>,
3564) {
3565    let Some(config) = node.config.as_ref() else {
3566        return;
3567    };
3568    let Some(set_globals) = config.set_globals.as_ref() else {
3569        return;
3570    };
3571
3572    let context = json!({
3573        "input": workflow_input,
3574        "nodes": outputs,
3575        "globals": Value::Object(globals.clone())
3576    });
3577
3578    for (key, expr) in set_globals {
3579        let value = resolve_path(&context, expr.as_str())
3580            .cloned()
3581            .unwrap_or(Value::Null);
3582        globals.insert(key.clone(), value);
3583    }
3584}
3585
3586fn apply_update_globals(
3587    node: &YamlNode,
3588    outputs: &BTreeMap<String, Value>,
3589    workflow_input: &Value,
3590    globals: &mut serde_json::Map<String, Value>,
3591) {
3592    let Some(config) = node.config.as_ref() else {
3593        return;
3594    };
3595    let Some(update_globals) = config.update_globals.as_ref() else {
3596        return;
3597    };
3598
3599    let context = json!({
3600        "input": workflow_input,
3601        "nodes": outputs,
3602        "globals": Value::Object(globals.clone())
3603    });
3604
3605    for (key, update) in update_globals {
3606        match update.op.as_str() {
3607            "set" => {
3608                if let Some(path) = update.from.as_ref() {
3609                    let value = resolve_path(&context, path.as_str())
3610                        .cloned()
3611                        .unwrap_or(Value::Null);
3612                    globals.insert(key.clone(), value);
3613                }
3614            }
3615            "append" => {
3616                if let Some(path) = update.from.as_ref() {
3617                    let value = resolve_path(&context, path.as_str())
3618                        .cloned()
3619                        .unwrap_or(Value::Null);
3620                    let entry = globals
3621                        .entry(key.clone())
3622                        .or_insert_with(|| Value::Array(Vec::new()));
3623                    match entry {
3624                        Value::Array(items) => items.push(value),
3625                        other => {
3626                            let existing = other.clone();
3627                            *other = Value::Array(vec![existing, value]);
3628                        }
3629                    }
3630                }
3631            }
3632            "increment" => {
3633                let by = update.by.unwrap_or(1.0);
3634                let current = globals
3635                    .get(key.as_str())
3636                    .and_then(Value::as_f64)
3637                    .unwrap_or(0.0);
3638                if let Some(next) = serde_json::Number::from_f64(current + by) {
3639                    globals.insert(key.clone(), Value::Number(next));
3640                }
3641            }
3642            "merge" => {
3643                if let Some(path) = update.from.as_ref() {
3644                    let source = resolve_path(&context, path.as_str())
3645                        .cloned()
3646                        .unwrap_or(Value::Null);
3647                    if let Value::Object(source_map) = source {
3648                        let target = globals
3649                            .entry(key.clone())
3650                            .or_insert_with(|| Value::Object(serde_json::Map::new()));
3651                        if let Value::Object(target_map) = target {
3652                            target_map.extend(source_map);
3653                        } else {
3654                            *target = Value::Object(source_map);
3655                        }
3656                    }
3657                }
3658            }
3659            _ => {}
3660        }
3661    }
3662}
3663
3664fn llm_output_schema_for_node(node: &YamlNode) -> Value {
3665    if let Some(schema) = node
3666        .config
3667        .as_ref()
3668        .and_then(|cfg| cfg.output_schema.clone())
3669    {
3670        return schema;
3671    }
3672
3673    default_llm_output_schema()
3674}
3675
3676fn normalize_tool_choice(
3677    config: Option<YamlToolChoiceConfig>,
3678) -> Result<Option<ToolChoice>, String> {
3679    let Some(config) = config else {
3680        return Ok(None);
3681    };
3682
3683    let choice = match config {
3684        YamlToolChoiceConfig::Mode(mode) => ToolChoice::Mode(mode),
3685        YamlToolChoiceConfig::Function { function } => ToolChoice::Tool(ToolChoiceTool {
3686            tool_type: ToolType::Function,
3687            function: ToolChoiceFunction { name: function },
3688        }),
3689        YamlToolChoiceConfig::OpenAi(tool) => ToolChoice::Tool(tool),
3690    };
3691
3692    Ok(Some(choice))
3693}
3694
3695fn normalize_llm_tools(llm: &YamlLlmCall) -> Result<Vec<YamlResolvedTool>, String> {
3696    llm.tools
3697        .iter()
3698        .map(|tool| match (llm.tools_format, tool) {
3699            (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
3700                let definition = ToolDefinition {
3701                    tool_type: openai.tool_type.unwrap_or(ToolType::Function),
3702                    function: ToolFunction {
3703                        name: openai.function.name.clone(),
3704                        description: openai.function.description.clone(),
3705                        parameters: openai.function.parameters.clone(),
3706                    },
3707                };
3708                Ok(YamlResolvedTool {
3709                    definition,
3710                    output_schema: openai.function.output_schema.clone(),
3711                })
3712            }
3713            (YamlToolFormat::Simplified, YamlToolDeclaration::Simplified(simple)) => {
3714                let definition = ToolDefinition {
3715                    tool_type: ToolType::Function,
3716                    function: ToolFunction {
3717                        name: simple.name.clone(),
3718                        description: simple.description.clone(),
3719                        parameters: Some(simple.input_schema.clone()),
3720                    },
3721                };
3722                Ok(YamlResolvedTool {
3723                    definition,
3724                    output_schema: simple.output_schema.clone(),
3725                })
3726            }
3727            (YamlToolFormat::Openai, _) => {
3728                Err("tools_format=openai requires OpenAI-style tool declarations".to_string())
3729            }
3730            (YamlToolFormat::Simplified, _) => {
3731                Err("tools_format=simplified requires simplified tool declarations".to_string())
3732            }
3733        })
3734        .collect()
3735}
3736
3737fn default_llm_output_schema() -> Value {
3738    json!({
3739        "type": "object",
3740        "additionalProperties": true
3741    })
3742}
3743
3744fn mock_rag(topic: &str) -> Value {
3745    let (kb_source, playbook) = match topic {
3746        "probation" => (
3747            "hr_policy/probation.md",
3748            "Collect manager review, performance evidence, and probation timeline.",
3749        ),
3750        "leave_request" => (
3751            "hr_policy/leave.md",
3752            "Validate leave balance, manager approval, and blackout dates.",
3753        ),
3754        "supply_chain_order_assessment" => (
3755            "supply_chain/order_assessment.md",
3756            "Review order specs, inventory risk, and vendor lead-time guidance.",
3757        ),
3758        "supply_chain_order_replacement" => (
3759            "supply_chain/order_replacement.md",
3760            "Collect order id, damage proof, and replacement SLA policy.",
3761        ),
3762        "termination_first_time_offense" => (
3763            "hr_policy/termination_first_offense.md",
3764            "Validate first-incident criteria and route to HRBP review.",
3765        ),
3766        "termination_repeated_offense" => (
3767            "hr_policy/termination_repeated_offense.md",
3768            "Collect prior warnings and escalation approvals before final action.",
3769        ),
3770        _ => (
3771            "shared/request_clarification.md",
3772            "Request clarifying details before routing.",
3773        ),
3774    };
3775
3776    json!({
3777        "kb_source": kb_source,
3778        "playbook": playbook,
3779    })
3780}
3781
3782fn mock_custom_worker_output(
3783    handler: &str,
3784    payload: &Value,
3785) -> Result<Value, YamlWorkflowRunError> {
3786    if let Some(topic) = payload.get("topic").and_then(Value::as_str) {
3787        let mut value = mock_rag(topic);
3788        if let Value::Object(object) = &mut value {
3789            object.insert("handler".to_string(), Value::String(handler.to_string()));
3790        }
3791        return Ok(value);
3792    }
3793
3794    Err(YamlWorkflowRunError::UnsupportedCustomHandler {
3795        handler: handler.to_string(),
3796    })
3797}
3798
3799#[derive(Debug, Clone, Deserialize)]
3800pub struct YamlWorkflow {
3801    pub id: String,
3802    pub entry_node: String,
3803    #[serde(default)]
3804    pub nodes: Vec<YamlNode>,
3805    #[serde(default)]
3806    pub edges: Vec<YamlEdge>,
3807}
3808
3809#[derive(Debug, Clone, Deserialize)]
3810pub struct YamlNode {
3811    pub id: String,
3812    pub node_type: YamlNodeType,
3813    pub config: Option<YamlNodeConfig>,
3814}
3815
3816impl YamlNode {
3817    fn kind_name(&self) -> &'static str {
3818        if self.node_type.llm_call.is_some() {
3819            "llm_call"
3820        } else if self.node_type.switch.is_some() {
3821            "switch"
3822        } else if self.node_type.custom_worker.is_some() {
3823            "custom_worker"
3824        } else {
3825            "unknown"
3826        }
3827    }
3828}
3829
3830#[derive(Debug, Clone, Deserialize)]
3831pub struct YamlNodeType {
3832    pub llm_call: Option<YamlLlmCall>,
3833    pub switch: Option<YamlSwitch>,
3834    pub custom_worker: Option<YamlCustomWorker>,
3835}
3836
3837#[derive(Debug, Clone, Deserialize)]
3838pub struct YamlLlmCall {
3839    pub model: String,
3840    pub stream: Option<bool>,
3841    pub stream_json_as_text: Option<bool>,
3842    pub heal: Option<bool>,
3843    pub messages_path: Option<String>,
3844    pub append_prompt_as_user: Option<bool>,
3845    #[serde(default)]
3846    pub tools_format: YamlToolFormat,
3847    #[serde(default)]
3848    pub tools: Vec<YamlToolDeclaration>,
3849    pub tool_choice: Option<YamlToolChoiceConfig>,
3850    pub max_tool_roundtrips: Option<u8>,
3851    pub tool_calls_global_key: Option<String>,
3852}
3853
3854#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
3855#[serde(rename_all = "snake_case")]
3856pub enum YamlToolFormat {
3857    #[default]
3858    Openai,
3859    Simplified,
3860}
3861
3862#[derive(Debug, Clone, Deserialize)]
3863#[serde(untagged)]
3864pub enum YamlToolDeclaration {
3865    OpenAi(YamlOpenAiToolDeclaration),
3866    Simplified(YamlSimplifiedToolDeclaration),
3867}
3868
3869#[derive(Debug, Clone, Deserialize)]
3870pub struct YamlOpenAiToolDeclaration {
3871    #[serde(rename = "type")]
3872    pub tool_type: Option<ToolType>,
3873    pub function: YamlOpenAiToolFunction,
3874}
3875
3876#[derive(Debug, Clone, Deserialize)]
3877pub struct YamlOpenAiToolFunction {
3878    pub name: String,
3879    pub description: Option<String>,
3880    pub parameters: Option<Value>,
3881    pub output_schema: Option<Value>,
3882}
3883
3884#[derive(Debug, Clone, Deserialize)]
3885pub struct YamlSimplifiedToolDeclaration {
3886    pub name: String,
3887    pub description: Option<String>,
3888    pub input_schema: Value,
3889    pub output_schema: Option<Value>,
3890}
3891
3892#[derive(Debug, Clone, Deserialize)]
3893#[serde(untagged)]
3894pub enum YamlToolChoiceConfig {
3895    Mode(ToolChoiceMode),
3896    Function { function: String },
3897    OpenAi(ToolChoiceTool),
3898}
3899
3900#[derive(Debug, Clone, Deserialize)]
3901pub struct YamlSwitch {
3902    #[serde(default)]
3903    pub branches: Vec<YamlSwitchBranch>,
3904    pub default: String,
3905}
3906
3907#[derive(Debug, Clone, Deserialize)]
3908pub struct YamlSwitchBranch {
3909    pub condition: String,
3910    pub target: String,
3911}
3912
3913#[derive(Debug, Clone, Deserialize)]
3914pub struct YamlCustomWorker {
3915    pub handler: String,
3916}
3917
3918#[derive(Debug, Clone, Deserialize)]
3919pub struct YamlNodeConfig {
3920    pub prompt: Option<String>,
3921    #[serde(default, alias = "schema")]
3922    pub output_schema: Option<Value>,
3923    pub payload: Option<Value>,
3924    pub set_globals: Option<HashMap<String, String>>,
3925    pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
3926}
3927
3928#[derive(Debug, Clone, Deserialize)]
3929pub struct YamlGlobalUpdate {
3930    pub op: String,
3931    pub from: Option<String>,
3932    pub by: Option<f64>,
3933}
3934
3935#[derive(Debug, Clone, Deserialize)]
3936pub struct YamlEdge {
3937    pub from: String,
3938    pub to: String,
3939}
3940
3941#[cfg(test)]
3942mod tests {
3943    use super::*;
3944    use simple_agent_type::provider::{Provider, ProviderRequest, ProviderResponse};
3945    use simple_agent_type::response::{CompletionChoice, CompletionResponse, Usage};
3946    use simple_agent_type::tool::{ToolCallFunction, ToolType};
3947    use simple_agent_type::{Result as SaResult, SimpleAgentsError};
3948    use simple_agents_core::SimpleAgentsClientBuilder;
3949    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3950    use std::sync::{Arc, Mutex, OnceLock};
3951
3952    fn stream_debug_env_lock() -> &'static Mutex<()> {
3953        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
3954        LOCK.get_or_init(|| Mutex::new(()))
3955    }
3956
3957    struct MockExecutor;
3958
3959    struct RecordingSink {
3960        events: Mutex<Vec<YamlWorkflowEvent>>,
3961    }
3962
3963    struct CancelAfterFirstEventSink {
3964        cancelled: AtomicBool,
3965    }
3966
3967    impl YamlWorkflowEventSink for CancelAfterFirstEventSink {
3968        fn emit(&self, _event: &YamlWorkflowEvent) {
3969            self.cancelled.store(true, Ordering::SeqCst);
3970        }
3971
3972        fn is_cancelled(&self) -> bool {
3973            self.cancelled.load(Ordering::SeqCst)
3974        }
3975    }
3976
3977    struct CountingExecutor {
3978        call_count: AtomicUsize,
3979    }
3980
3981    struct CapturingWorker {
3982        context: Mutex<Option<Value>>,
3983    }
3984
3985    struct ToolLoopProvider;
3986
3987    #[async_trait]
3988    impl Provider for ToolLoopProvider {
3989        fn name(&self) -> &str {
3990            "openai"
3991        }
3992
3993        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
3994            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
3995            Ok(ProviderRequest::new("mock://tool-loop").with_body(body))
3996        }
3997
3998        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
3999            let request: CompletionRequest =
4000                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
4001
4002            let has_tools = request
4003                .tools
4004                .as_ref()
4005                .is_some_and(|tools| !tools.is_empty());
4006            let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
4007
4008            let response = if has_tools && !has_tool_result {
4009                CompletionResponse {
4010                    id: "resp_tool_1".to_string(),
4011                    model: request.model.clone(),
4012                    choices: vec![CompletionChoice {
4013                        index: 0,
4014                        message: Message::assistant("").with_tool_calls(vec![ToolCall {
4015                            id: "call_get_context".to_string(),
4016                            tool_type: ToolType::Function,
4017                            function: ToolCallFunction {
4018                                name: "get_customer_context".to_string(),
4019                                arguments: "{\"order_id\":\"123\"}".to_string(),
4020                            },
4021                        }]),
4022                        finish_reason: FinishReason::ToolCalls,
4023                        logprobs: None,
4024                    }],
4025                    usage: Usage::new(10, 5),
4026                    created: None,
4027                    provider: Some(self.name().to_string()),
4028                    healing_metadata: None,
4029                }
4030            } else if has_tools && has_tool_result {
4031                CompletionResponse {
4032                    id: "resp_tool_2".to_string(),
4033                    model: request.model.clone(),
4034                    choices: vec![CompletionChoice {
4035                        index: 0,
4036                        message: Message::assistant("{\"state\":\"done\"}"),
4037                        finish_reason: FinishReason::Stop,
4038                        logprobs: None,
4039                    }],
4040                    usage: Usage::new(12, 6),
4041                    created: None,
4042                    provider: Some(self.name().to_string()),
4043                    healing_metadata: None,
4044                }
4045            } else {
4046                let prompt = request
4047                    .messages
4048                    .iter()
4049                    .rev()
4050                    .find(|m| m.role == Role::User)
4051                    .map(|m| m.content.clone())
4052                    .unwrap_or_default();
4053                let payload = json!({
4054                    "subject": "ok",
4055                    "body": prompt,
4056                })
4057                .to_string();
4058                CompletionResponse {
4059                    id: "resp_final".to_string(),
4060                    model: request.model.clone(),
4061                    choices: vec![CompletionChoice {
4062                        index: 0,
4063                        message: Message::assistant(payload),
4064                        finish_reason: FinishReason::Stop,
4065                        logprobs: None,
4066                    }],
4067                    usage: Usage::new(8, 4),
4068                    created: None,
4069                    provider: Some(self.name().to_string()),
4070                    healing_metadata: None,
4071                }
4072            };
4073
4074            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
4075            Ok(ProviderResponse::new(200, body))
4076        }
4077
4078        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
4079            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
4080        }
4081    }
4082
4083    struct FixedToolWorker {
4084        payload: Value,
4085    }
4086
4087    #[async_trait]
4088    impl YamlWorkflowCustomWorkerExecutor for FixedToolWorker {
4089        async fn execute(
4090            &self,
4091            _handler: &str,
4092            _payload: &Value,
4093            _email_text: &str,
4094            _context: &Value,
4095        ) -> Result<Value, String> {
4096            Ok(self.payload.clone())
4097        }
4098    }
4099
4100    #[async_trait]
4101    impl YamlWorkflowLlmExecutor for CountingExecutor {
4102        async fn complete_structured(
4103            &self,
4104            _request: YamlLlmExecutionRequest,
4105            _event_sink: Option<&dyn YamlWorkflowEventSink>,
4106        ) -> Result<YamlLlmExecutionResult, String> {
4107            self.call_count.fetch_add(1, Ordering::SeqCst);
4108            Ok(YamlLlmExecutionResult {
4109                payload: json!({"state":"ok"}),
4110                usage: None,
4111                ttft_ms: None,
4112                tool_calls: Vec::new(),
4113            })
4114        }
4115    }
4116
4117    impl YamlWorkflowEventSink for RecordingSink {
4118        fn emit(&self, event: &YamlWorkflowEvent) {
4119            self.events
4120                .lock()
4121                .expect("recording sink lock should not be poisoned")
4122                .push(event.clone());
4123        }
4124    }
4125
4126    #[async_trait]
4127    impl YamlWorkflowCustomWorkerExecutor for CapturingWorker {
4128        async fn execute(
4129            &self,
4130            _handler: &str,
4131            _payload: &Value,
4132            _email_text: &str,
4133            context: &Value,
4134        ) -> Result<Value, String> {
4135            let mut guard = self
4136                .context
4137                .lock()
4138                .map_err(|_| "capturing worker lock should not be poisoned".to_string())?;
4139            *guard = Some(context.clone());
4140            Ok(json!({"ok": true}))
4141        }
4142    }
4143
4144    #[async_trait]
4145    impl YamlWorkflowLlmExecutor for MockExecutor {
4146        async fn complete_structured(
4147            &self,
4148            request: YamlLlmExecutionRequest,
4149            _event_sink: Option<&dyn YamlWorkflowEventSink>,
4150        ) -> Result<YamlLlmExecutionResult, String> {
4151            let prompt = request.prompt;
4152            if prompt.contains("exactly one category") {
4153                return Ok(YamlLlmExecutionResult {
4154                    payload: json!({"category":"termination","reason":"mock"}),
4155                    usage: Some(YamlLlmTokenUsage {
4156                        prompt_tokens: 10,
4157                        completion_tokens: 5,
4158                        total_tokens: 15,
4159                        thinking_tokens: None,
4160                    }),
4161                    ttft_ms: None,
4162                    tool_calls: Vec::new(),
4163                });
4164            }
4165            if prompt.contains("Determine termination subtype") {
4166                return Ok(YamlLlmExecutionResult {
4167                    payload: json!({"subtype":"repeated_offense","reason":"mock"}),
4168                    usage: Some(YamlLlmTokenUsage {
4169                        prompt_tokens: 12,
4170                        completion_tokens: 6,
4171                        total_tokens: 18,
4172                        thinking_tokens: None,
4173                    }),
4174                    ttft_ms: None,
4175                    tool_calls: Vec::new(),
4176                });
4177            }
4178            if prompt.contains("Determine supply chain subtype") {
4179                return Ok(YamlLlmExecutionResult {
4180                    payload: json!({"subtype":"order_replacement","reason":"mock"}),
4181                    usage: Some(YamlLlmTokenUsage {
4182                        prompt_tokens: 11,
4183                        completion_tokens: 4,
4184                        total_tokens: 15,
4185                        thinking_tokens: None,
4186                    }),
4187                    ttft_ms: None,
4188                    tool_calls: Vec::new(),
4189                });
4190            }
4191            Err("unexpected prompt".to_string())
4192        }
4193    }
4194
4195    #[tokio::test]
4196    async fn runs_yaml_workflow_and_returns_step_timings() {
4197        let yaml = r#"
4198id: email-intake-classification
4199entry_node: classify_top_level
4200nodes:
4201  - id: classify_top_level
4202    node_type:
4203      llm_call:
4204        model: gpt-4.1
4205    config:
4206      prompt: |
4207        Classify this email into exactly one category:
4208        {{ input.email_text }}
4209  - id: route_top_level
4210    node_type:
4211      switch:
4212        branches:
4213          - condition: '$.nodes.classify_top_level.output.category == "termination"'
4214            target: classify_termination_subtype
4215        default: rag_clarification
4216  - id: classify_termination_subtype
4217    node_type:
4218      llm_call:
4219        model: gpt-4.1
4220    config:
4221      prompt: |
4222        Determine termination subtype:
4223        {{ input.email_text }}
4224  - id: route_termination_subtype
4225    node_type:
4226      switch:
4227        branches:
4228          - condition: '$.nodes.classify_termination_subtype.output.subtype == "repeated_offense"'
4229            target: rag_termination_repeated_offense
4230        default: rag_clarification
4231  - id: rag_termination_repeated_offense
4232    node_type:
4233      custom_worker:
4234        handler: GetRagData
4235    config:
4236      payload:
4237        topic: termination_repeated_offense
4238  - id: rag_clarification
4239    node_type:
4240      custom_worker:
4241        handler: GetRagData
4242    config:
4243      payload:
4244        topic: clarification
4245edges:
4246  - from: classify_top_level
4247    to: route_top_level
4248  - from: classify_termination_subtype
4249    to: route_termination_subtype
4250"#;
4251
4252        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4253        let output = run_email_workflow_yaml(&workflow, "test", &MockExecutor)
4254            .await
4255            .expect("yaml workflow should execute");
4256
4257        assert_eq!(output.workflow_id, "email-intake-classification");
4258        assert_eq!(output.terminal_node, "rag_termination_repeated_offense");
4259        assert!(!output.step_timings.is_empty());
4260        assert_eq!(output.step_timings.len(), output.trace.len());
4261        assert!(output
4262            .outputs
4263            .contains_key("rag_termination_repeated_offense"));
4264        assert_eq!(output.total_input_tokens, 22);
4265        assert_eq!(output.total_output_tokens, 11);
4266        assert_eq!(output.total_tokens, 33);
4267    }
4268
4269    #[tokio::test]
4270    async fn emits_resolved_llm_input_event_with_bindings() {
4271        let yaml = r#"
4272id: email-intake-classification
4273entry_node: classify_top_level
4274nodes:
4275  - id: classify_top_level
4276    node_type:
4277      llm_call:
4278        model: gpt-4.1
4279    config:
4280      prompt: |
4281        Classify this email into exactly one category:
4282        {{ input.email_text }}
4283"#;
4284
4285        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4286        let sink = RecordingSink {
4287            events: Mutex::new(Vec::new()),
4288        };
4289
4290        let output = run_email_workflow_yaml_with_custom_worker_and_events(
4291            &workflow,
4292            "Need help with termination",
4293            &MockExecutor,
4294            None,
4295            Some(&sink),
4296        )
4297        .await
4298        .expect("yaml workflow should execute");
4299
4300        assert_eq!(output.terminal_node, "classify_top_level");
4301
4302        let events = sink
4303            .events
4304            .lock()
4305            .expect("recording sink lock should not be poisoned");
4306        let llm_event = events
4307            .iter()
4308            .find(|event| event.event_type == "node_llm_input_resolved")
4309            .expect("expected llm input telemetry event");
4310
4311        let metadata = llm_event
4312            .metadata
4313            .as_ref()
4314            .expect("llm input event must include metadata");
4315        assert_eq!(metadata["model"], Value::String("gpt-4.1".to_string()));
4316        assert_eq!(metadata["stream_requested"], Value::Bool(false));
4317        assert_eq!(metadata["heal_requested"], Value::Bool(false));
4318        assert!(metadata["prompt"]
4319            .as_str()
4320            .expect("prompt should be a string")
4321            .contains("Need help with termination"));
4322
4323        let bindings = metadata["bindings"]
4324            .as_array()
4325            .expect("bindings should be an array");
4326        assert_eq!(bindings.len(), 1);
4327        assert_eq!(
4328            bindings[0]["source_path"],
4329            Value::String("input.email_text".to_string())
4330        );
4331        assert_eq!(
4332            bindings[0]["resolved"],
4333            Value::String("Need help with termination".to_string())
4334        );
4335        assert_eq!(bindings[0]["missing"], Value::Bool(false));
4336        assert_eq!(
4337            bindings[0]["resolved_type"],
4338            Value::String("string".to_string())
4339        );
4340    }
4341
4342    #[tokio::test]
4343    async fn workflow_completed_event_includes_nerdstats_by_default() {
4344        let yaml = r#"
4345id: nerdstats-default
4346entry_node: classify
4347nodes:
4348  - id: classify
4349    node_type:
4350      llm_call:
4351        model: gpt-4.1
4352    config:
4353      prompt: |
4354        Classify this email into exactly one category:
4355        {{ input.email_text }}
4356"#;
4357
4358        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4359        let sink = RecordingSink {
4360            events: Mutex::new(Vec::new()),
4361        };
4362
4363        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
4364            &workflow,
4365            &json!({"email_text":"hello"}),
4366            &MockExecutor,
4367            None,
4368            Some(&sink),
4369            &YamlWorkflowRunOptions::default(),
4370        )
4371        .await
4372        .expect("workflow should execute");
4373
4374        let events = sink
4375            .events
4376            .lock()
4377            .expect("recording sink lock should not be poisoned");
4378        let completed = events
4379            .iter()
4380            .find(|event| event.event_type == "workflow_completed")
4381            .expect("expected workflow_completed event");
4382        let metadata = completed
4383            .metadata
4384            .as_ref()
4385            .expect("workflow_completed should include metadata by default");
4386        let nerdstats = metadata
4387            .get("nerdstats")
4388            .expect("nerdstats should be present by default");
4389
4390        assert_eq!(nerdstats["workflow_id"], Value::String(output.workflow_id));
4391        assert_eq!(
4392            nerdstats["terminal_node"],
4393            Value::String(output.terminal_node)
4394        );
4395        assert_eq!(
4396            nerdstats["total_tokens"],
4397            Value::Number(output.total_tokens.into())
4398        );
4399        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
4400        assert_eq!(
4401            nerdstats["token_metrics_source"],
4402            Value::String("provider_usage".to_string())
4403        );
4404        assert_eq!(nerdstats["ttft_ms"], Value::Null);
4405    }
4406
4407    #[tokio::test]
4408    async fn workflow_completed_event_omits_nerdstats_when_disabled() {
4409        let yaml = r#"
4410id: nerdstats-disabled
4411entry_node: classify
4412nodes:
4413  - id: classify
4414    node_type:
4415      llm_call:
4416        model: gpt-4.1
4417    config:
4418      prompt: |
4419        Classify this email into exactly one category:
4420        {{ input.email_text }}
4421"#;
4422
4423        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4424        let sink = RecordingSink {
4425            events: Mutex::new(Vec::new()),
4426        };
4427        let options = YamlWorkflowRunOptions {
4428            telemetry: YamlWorkflowTelemetryConfig {
4429                nerdstats: false,
4430                ..YamlWorkflowTelemetryConfig::default()
4431            },
4432            ..YamlWorkflowRunOptions::default()
4433        };
4434
4435        run_workflow_yaml_with_custom_worker_and_events_and_options(
4436            &workflow,
4437            &json!({"email_text":"hello"}),
4438            &MockExecutor,
4439            None,
4440            Some(&sink),
4441            &options,
4442        )
4443        .await
4444        .expect("workflow should execute");
4445
4446        let events = sink
4447            .events
4448            .lock()
4449            .expect("recording sink lock should not be poisoned");
4450        let completed = events
4451            .iter()
4452            .find(|event| event.event_type == "workflow_completed")
4453            .expect("expected workflow_completed event");
4454        assert!(completed.metadata.is_none());
4455    }
4456
4457    struct StreamAwareMockExecutor;
4458
4459    #[async_trait]
4460    impl YamlWorkflowLlmExecutor for StreamAwareMockExecutor {
4461        async fn complete_structured(
4462            &self,
4463            request: YamlLlmExecutionRequest,
4464            _event_sink: Option<&dyn YamlWorkflowEventSink>,
4465        ) -> Result<YamlLlmExecutionResult, String> {
4466            Ok(YamlLlmExecutionResult {
4467                payload: json!({"state":"ok"}),
4468                usage: Some(YamlLlmTokenUsage {
4469                    prompt_tokens: 20,
4470                    completion_tokens: 10,
4471                    total_tokens: 30,
4472                    thinking_tokens: None,
4473                }),
4474                ttft_ms: if request.stream { Some(12) } else { None },
4475                tool_calls: Vec::new(),
4476            })
4477        }
4478    }
4479
4480    #[tokio::test]
4481    async fn workflow_completed_event_includes_nerdstats_for_streaming_nodes() {
4482        let yaml = r#"
4483id: nerdstats-streaming
4484entry_node: classify
4485nodes:
4486  - id: classify
4487    node_type:
4488      llm_call:
4489        model: gpt-4.1
4490        stream: true
4491    config:
4492      prompt: |
4493        Return JSON only:
4494        {"state":"ok"}
4495"#;
4496
4497        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4498        let sink = RecordingSink {
4499            events: Mutex::new(Vec::new()),
4500        };
4501
4502        run_workflow_yaml_with_custom_worker_and_events_and_options(
4503            &workflow,
4504            &json!({"email_text":"hello"}),
4505            &StreamAwareMockExecutor,
4506            None,
4507            Some(&sink),
4508            &YamlWorkflowRunOptions::default(),
4509        )
4510        .await
4511        .expect("workflow should execute");
4512
4513        let events = sink
4514            .events
4515            .lock()
4516            .expect("recording sink lock should not be poisoned");
4517        let completed = events
4518            .iter()
4519            .find(|event| event.event_type == "workflow_completed")
4520            .expect("expected workflow_completed event");
4521        let metadata = completed
4522            .metadata
4523            .as_ref()
4524            .expect("workflow_completed should include metadata by default");
4525        let nerdstats = metadata
4526            .get("nerdstats")
4527            .expect("nerdstats should be present by default");
4528
4529        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
4530        assert_eq!(nerdstats["total_tokens"], Value::Number(30u64.into()));
4531        assert_eq!(nerdstats["ttft_ms"], Value::Number(12u64.into()));
4532    }
4533
4534    #[test]
4535    fn workflow_nerdstats_marks_stream_token_metrics_unavailable() {
4536        let output = YamlWorkflowRunOutput {
4537            workflow_id: "workflow".to_string(),
4538            entry_node: "start".to_string(),
4539            email_text: "hello".to_string(),
4540            trace: vec!["llm_node".to_string()],
4541            outputs: BTreeMap::new(),
4542            terminal_node: "llm_node".to_string(),
4543            terminal_output: None,
4544            step_timings: vec![YamlStepTiming {
4545                node_id: "llm_node".to_string(),
4546                node_kind: "llm_call".to_string(),
4547                elapsed_ms: 100,
4548                prompt_tokens: None,
4549                completion_tokens: None,
4550                total_tokens: None,
4551                thinking_tokens: None,
4552                tokens_per_second: None,
4553            }],
4554            llm_node_metrics: BTreeMap::new(),
4555            total_elapsed_ms: 100,
4556            ttft_ms: None,
4557            total_input_tokens: 0,
4558            total_output_tokens: 0,
4559            total_tokens: 0,
4560            total_thinking_tokens: None,
4561            tokens_per_second: 0.0,
4562            trace_id: Some("trace-1".to_string()),
4563            metadata: None,
4564        };
4565
4566        let nerdstats = workflow_nerdstats(&output);
4567        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(false));
4568        assert_eq!(
4569            nerdstats["token_metrics_source"],
4570            Value::String("provider_stream_usage_unavailable".to_string())
4571        );
4572        assert_eq!(nerdstats["total_tokens"], Value::Null);
4573        assert_eq!(nerdstats["ttft_ms"], Value::Null);
4574    }
4575
4576    #[test]
4577    fn workflow_nerdstats_includes_ttft_when_available() {
4578        let output = YamlWorkflowRunOutput {
4579            workflow_id: "workflow".to_string(),
4580            entry_node: "start".to_string(),
4581            email_text: "hello".to_string(),
4582            trace: vec!["llm_node".to_string()],
4583            outputs: BTreeMap::new(),
4584            terminal_node: "llm_node".to_string(),
4585            terminal_output: None,
4586            step_timings: vec![YamlStepTiming {
4587                node_id: "llm_node".to_string(),
4588                node_kind: "llm_call".to_string(),
4589                elapsed_ms: 100,
4590                prompt_tokens: Some(10),
4591                completion_tokens: Some(15),
4592                total_tokens: Some(25),
4593                thinking_tokens: None,
4594                tokens_per_second: Some(150.0),
4595            }],
4596            llm_node_metrics: BTreeMap::new(),
4597            total_elapsed_ms: 100,
4598            ttft_ms: Some(42),
4599            total_input_tokens: 10,
4600            total_output_tokens: 15,
4601            total_tokens: 25,
4602            total_thinking_tokens: None,
4603            tokens_per_second: 150.0,
4604            trace_id: Some("trace-2".to_string()),
4605            metadata: None,
4606        };
4607
4608        let nerdstats = workflow_nerdstats(&output);
4609        assert_eq!(nerdstats["ttft_ms"], Value::Number(42u64.into()));
4610    }
4611
4612    struct MessageHistoryExecutor;
4613
4614    #[async_trait]
4615    impl YamlWorkflowLlmExecutor for MessageHistoryExecutor {
4616        async fn complete_structured(
4617            &self,
4618            request: YamlLlmExecutionRequest,
4619            _event_sink: Option<&dyn YamlWorkflowEventSink>,
4620        ) -> Result<YamlLlmExecutionResult, String> {
4621            let messages = request
4622                .messages
4623                .ok_or_else(|| "expected messages in request".to_string())?;
4624            if messages.len() != 2 {
4625                return Err(format!("expected 2 messages, got {}", messages.len()));
4626            }
4627            Ok(YamlLlmExecutionResult {
4628                payload: json!({"category":"termination","reason":"history"}),
4629                usage: Some(YamlLlmTokenUsage {
4630                    prompt_tokens: 7,
4631                    completion_tokens: 3,
4632                    total_tokens: 10,
4633                    thinking_tokens: None,
4634                }),
4635                ttft_ms: None,
4636                tool_calls: Vec::new(),
4637            })
4638        }
4639    }
4640
4641    #[tokio::test]
4642    async fn supports_messages_path_in_workflow_input() {
4643        let yaml = r#"
4644id: email-intake-classification
4645entry_node: classify_top_level
4646nodes:
4647  - id: classify_top_level
4648    node_type:
4649      llm_call:
4650        model: gpt-4.1
4651        messages_path: input.messages
4652        append_prompt_as_user: false
4653"#;
4654
4655        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4656        let input = json!({
4657            "email_text": "ignored",
4658            "messages": [
4659                {"role": "system", "content": "You are a classifier"},
4660                {"role": "user", "content": "Please classify this"}
4661            ]
4662        });
4663
4664        let output = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
4665            .await
4666            .expect("workflow should use chat history from input");
4667
4668        assert_eq!(output.terminal_node, "classify_top_level");
4669        assert_eq!(
4670            output.outputs["classify_top_level"]["output"]["reason"],
4671            Value::String("history".to_string())
4672        );
4673    }
4674
4675    #[tokio::test]
4676    async fn wrapper_entrypoints_produce_equivalent_outputs() {
4677        let yaml = r#"
4678id: wrapper-equivalence
4679entry_node: classify
4680nodes:
4681  - id: classify
4682    node_type:
4683      llm_call:
4684        model: gpt-4.1
4685    config:
4686      prompt: |
4687        Classify this email into exactly one category:
4688        {{ input.email_text }}
4689"#;
4690
4691        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4692        let input = json!({"email_text":"hello"});
4693
4694        let a = run_workflow_yaml(&workflow, &input, &MockExecutor)
4695            .await
4696            .expect("base entrypoint should execute");
4697        let b = run_workflow_yaml_with_custom_worker(&workflow, &input, &MockExecutor, None)
4698            .await
4699            .expect("custom worker wrapper should execute");
4700        let c = run_workflow_yaml_with_custom_worker_and_events_and_options(
4701            &workflow,
4702            &input,
4703            &MockExecutor,
4704            None,
4705            None,
4706            &YamlWorkflowRunOptions::default(),
4707        )
4708        .await
4709        .expect("events/options wrapper should execute");
4710
4711        assert_eq!(a.workflow_id, b.workflow_id);
4712        assert_eq!(a.workflow_id, c.workflow_id);
4713        assert_eq!(a.terminal_node, b.terminal_node);
4714        assert_eq!(a.terminal_node, c.terminal_node);
4715        assert_eq!(a.outputs, b.outputs);
4716        assert_eq!(a.outputs, c.outputs);
4717        assert_eq!(a.total_tokens, b.total_tokens);
4718        assert_eq!(a.total_tokens, c.total_tokens);
4719    }
4720
4721    #[tokio::test]
4722    async fn yaml_llm_tool_calling_captures_traces_and_supports_globals_reference() {
4723        let yaml = r#"
4724id: tool-calling-workflow
4725entry_node: generate_with_tool
4726nodes:
4727  - id: generate_with_tool
4728    node_type:
4729      llm_call:
4730        model: gpt-4.1
4731        tools_format: simplified
4732        max_tool_roundtrips: 1
4733        tool_calls_global_key: audit
4734        tools:
4735          - name: get_customer_context
4736            description: Fetch customer context
4737            input_schema:
4738              type: object
4739              properties:
4740                order_id: { type: string }
4741              required: [order_id]
4742              additionalProperties: false
4743            output_schema:
4744              type: object
4745              properties:
4746                customer_name: { type: string }
4747              required: [customer_name]
4748              additionalProperties: false
4749    config:
4750      output_schema:
4751        type: object
4752        properties:
4753          state: { type: string }
4754        required: [state]
4755  - id: personalize
4756    node_type:
4757      llm_call:
4758        model: gpt-4.1
4759    config:
4760      prompt: |
4761        Write an email greeting for {{ globals.audit.0.output.customer_name }}.
4762      output_schema:
4763        type: object
4764        properties:
4765          subject: { type: string }
4766          body: { type: string }
4767        required: [subject, body]
4768edges:
4769  - from: generate_with_tool
4770    to: personalize
4771"#;
4772
4773        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4774        let client = SimpleAgentsClientBuilder::new()
4775            .with_provider(Arc::new(ToolLoopProvider))
4776            .build()
4777            .expect("client should build");
4778        let worker = FixedToolWorker {
4779            payload: json!({"customer_name": "Ava"}),
4780        };
4781
4782        let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4783            &workflow,
4784            &json!({"email_text":"hello"}),
4785            &client,
4786            Some(&worker),
4787            None,
4788            &YamlWorkflowRunOptions::default(),
4789        )
4790        .await
4791        .expect("workflow should execute");
4792
4793        assert_eq!(output.trace, vec!["generate_with_tool", "personalize"]);
4794        assert_eq!(
4795            output.outputs["generate_with_tool"]["tool_calls"][0]["output"]["customer_name"],
4796            Value::String("Ava".to_string())
4797        );
4798        let body = output.outputs["personalize"]["output"]["body"]
4799            .as_str()
4800            .expect("body should be string");
4801        assert!(body.contains("Ava"));
4802    }
4803
4804    #[tokio::test]
4805    async fn yaml_llm_tool_output_schema_mismatch_hard_fails_node() {
4806        let yaml = r#"
4807id: tool-calling-schema-fail
4808entry_node: generate_with_tool
4809nodes:
4810  - id: generate_with_tool
4811    node_type:
4812      llm_call:
4813        model: gpt-4.1
4814        tools_format: simplified
4815        max_tool_roundtrips: 1
4816        tools:
4817          - name: get_customer_context
4818            input_schema:
4819              type: object
4820              properties:
4821                order_id: { type: string }
4822              required: [order_id]
4823              additionalProperties: false
4824            output_schema:
4825              type: object
4826              properties:
4827                customer_name: { type: string }
4828              required: [customer_name]
4829              additionalProperties: false
4830    config:
4831      output_schema:
4832        type: object
4833        properties:
4834          state: { type: string }
4835        required: [state]
4836"#;
4837
4838        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4839        let client = SimpleAgentsClientBuilder::new()
4840            .with_provider(Arc::new(ToolLoopProvider))
4841            .build()
4842            .expect("client should build");
4843        let worker = FixedToolWorker {
4844            payload: json!({"unexpected": "shape"}),
4845        };
4846
4847        let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4848            &workflow,
4849            &json!({"email_text":"hello"}),
4850            &client,
4851            Some(&worker),
4852            None,
4853            &YamlWorkflowRunOptions::default(),
4854        )
4855        .await
4856        .expect_err("workflow should hard-fail on schema mismatch");
4857
4858        match error {
4859            YamlWorkflowRunError::Llm { message, .. } => {
4860                assert!(message.contains("output failed schema validation"));
4861            }
4862            other => panic!("expected llm error, got {other:?}"),
4863        }
4864    }
4865
4866    #[test]
4867    fn validates_tools_format_mismatch() {
4868        let yaml = r#"
4869id: mismatch
4870entry_node: generate
4871nodes:
4872  - id: generate
4873    node_type:
4874      llm_call:
4875        model: gpt-4.1
4876        tools_format: openai
4877        tools:
4878          - name: get_customer_context
4879            input_schema:
4880              type: object
4881              properties:
4882                order_id: { type: string }
4883              required: [order_id]
4884"#;
4885
4886        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4887        let diagnostics = verify_yaml_workflow(&workflow);
4888        assert!(diagnostics
4889            .iter()
4890            .any(|diagnostic| diagnostic.code == "invalid_tools_format"));
4891    }
4892
4893    #[tokio::test]
4894    async fn custom_worker_receives_trace_context_block() {
4895        let yaml = r#"
4896id: custom-worker-trace-context
4897entry_node: lookup
4898nodes:
4899  - id: lookup
4900    node_type:
4901      custom_worker:
4902        handler: GetRagData
4903    config:
4904      payload:
4905        topic: demo
4906"#;
4907
4908        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4909        let worker = CapturingWorker {
4910            context: Mutex::new(None),
4911        };
4912        let options = YamlWorkflowRunOptions {
4913            trace: YamlWorkflowTraceOptions {
4914                context: Some(YamlWorkflowTraceContextInput {
4915                    trace_id: Some("trace-fixed-ctx".to_string()),
4916                    traceparent: Some("00-trace-fixed-ctx-span-fixed-01".to_string()),
4917                    ..YamlWorkflowTraceContextInput::default()
4918                }),
4919                tenant: YamlWorkflowTraceTenantContext {
4920                    conversation_id: Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5".to_string()),
4921                    request_id: Some("turn-7".to_string()),
4922                    ..YamlWorkflowTraceTenantContext::default()
4923                },
4924            },
4925            ..YamlWorkflowRunOptions::default()
4926        };
4927
4928        run_workflow_yaml_with_custom_worker_and_events_and_options(
4929            &workflow,
4930            &json!({"email_text":"hello"}),
4931            &MockExecutor,
4932            Some(&worker),
4933            None,
4934            &options,
4935        )
4936        .await
4937        .expect("workflow should execute");
4938
4939        let captured_context = worker
4940            .context
4941            .lock()
4942            .expect("capturing worker lock should not be poisoned")
4943            .clone()
4944            .expect("custom worker should receive context");
4945
4946        assert_eq!(
4947            captured_context
4948                .get("trace")
4949                .and_then(|trace| trace.get("context"))
4950                .and_then(|context| context.get("trace_id"))
4951                .and_then(Value::as_str),
4952            Some("trace-fixed-ctx")
4953        );
4954        assert_eq!(
4955            captured_context
4956                .get("trace")
4957                .and_then(|trace| trace.get("context"))
4958                .and_then(|context| context.get("traceparent"))
4959                .and_then(Value::as_str),
4960            Some("00-trace-fixed-ctx-span-fixed-01")
4961        );
4962        assert_eq!(
4963            captured_context
4964                .get("trace")
4965                .and_then(|trace| trace.get("tenant"))
4966                .and_then(|tenant| tenant.get("conversation_id"))
4967                .and_then(Value::as_str),
4968            Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5")
4969        );
4970    }
4971
4972    #[tokio::test]
4973    async fn event_sink_cancellation_stops_workflow_before_llm_execution() {
4974        let yaml = r#"
4975id: cancellation-test
4976entry_node: classify
4977nodes:
4978  - id: classify
4979    node_type:
4980      llm_call:
4981        model: gpt-4.1
4982    config:
4983      prompt: |
4984        Classify this email into exactly one category:
4985        {{ input.email_text }}
4986"#;
4987
4988        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4989        let executor = CountingExecutor {
4990            call_count: AtomicUsize::new(0),
4991        };
4992        let sink = CancelAfterFirstEventSink {
4993            cancelled: AtomicBool::new(false),
4994        };
4995
4996        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
4997            &workflow,
4998            &json!({"email_text":"hello"}),
4999            &executor,
5000            None,
5001            Some(&sink),
5002            &YamlWorkflowRunOptions::default(),
5003        )
5004        .await
5005        .expect_err("workflow should stop when sink signals cancellation");
5006
5007        assert!(matches!(
5008            err,
5009            YamlWorkflowRunError::EventSinkCancelled { .. }
5010        ));
5011        assert_eq!(executor.call_count.load(Ordering::SeqCst), 0);
5012    }
5013
5014    #[tokio::test]
5015    async fn rejects_invalid_messages_path_shape() {
5016        let yaml = r#"
5017id: email-intake-classification
5018entry_node: classify_top_level
5019nodes:
5020  - id: classify_top_level
5021    node_type:
5022      llm_call:
5023        model: gpt-4.1
5024        messages_path: input.messages
5025"#;
5026
5027        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5028        let input = json!({
5029            "email_text": "ignored",
5030            "messages": "not-a-list"
5031        });
5032
5033        let err = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
5034            .await
5035            .expect_err("workflow should fail for invalid messages shape");
5036
5037        assert!(matches!(err, YamlWorkflowRunError::Llm { .. }));
5038    }
5039
5040    #[test]
5041    fn renders_yaml_workflow_to_mermaid_with_switch_labels() {
5042        let yaml = r#"
5043id: chat-workflow
5044entry_node: decide
5045nodes:
5046  - id: decide
5047    node_type:
5048      switch:
5049        branches:
5050          - condition: '$.input.mode == "draft"'
5051            target: draft
5052        default: ask
5053  - id: draft
5054    node_type:
5055      llm_call:
5056        model: gpt-4.1
5057  - id: ask
5058    node_type:
5059      llm_call:
5060        model: gpt-4.1
5061edges:
5062  - from: draft
5063    to: ask
5064"#;
5065
5066        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5067        let mermaid = yaml_workflow_to_mermaid(&workflow);
5068
5069        assert!(mermaid.contains("flowchart TD"));
5070        assert!(mermaid.contains("decide -- \"route1\" --> draft"));
5071        assert!(mermaid.contains("decide -- \"default\" --> ask"));
5072        assert!(mermaid.contains("draft --> ask"));
5073    }
5074
5075    #[test]
5076    fn converts_yaml_workflow_to_ir_definition() {
5077        let yaml = r#"
5078id: chat-workflow
5079entry_node: classify
5080nodes:
5081  - id: classify
5082    node_type:
5083      llm_call:
5084        model: gpt-4.1
5085    config:
5086      prompt: |
5087        classify
5088  - id: route
5089    node_type:
5090      switch:
5091        branches:
5092          - condition: '$.nodes.classify.output.kind == "x"'
5093            target: done
5094        default: done
5095  - id: done
5096    node_type:
5097      custom_worker:
5098        handler: GetRagData
5099    config:
5100      payload:
5101        topic: test
5102edges:
5103  - from: classify
5104    to: route
5105"#;
5106
5107        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5108        let ir = yaml_workflow_to_ir(&workflow).expect("yaml should convert to ir");
5109
5110        assert_eq!(ir.name, "chat-workflow");
5111        assert!(ir.nodes.iter().any(|n| n.id == "__yaml_start"));
5112        assert!(ir.nodes.iter().any(|n| n.id == "classify"));
5113        assert!(ir.nodes.iter().any(|n| n.id == "route"));
5114        assert!(ir.nodes.iter().any(|n| n.id == "done"));
5115    }
5116
5117    #[test]
5118    fn supports_yaml_to_ir_when_messages_path_is_used() {
5119        let yaml = r#"
5120id: chat-workflow
5121entry_node: classify
5122nodes:
5123  - id: classify
5124    node_type:
5125      llm_call:
5126        model: gpt-4.1
5127        messages_path: input.messages
5128"#;
5129
5130        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5131        let ir =
5132            yaml_workflow_to_ir(&workflow).expect("messages_path should convert to tool-based IR");
5133        assert!(ir.nodes.iter().any(|node| matches!(
5134            node.kind,
5135            crate::ir::NodeKind::Tool { ref tool, .. } if tool == "__yaml_llm_call"
5136        )));
5137    }
5138
5139    #[tokio::test]
5140    async fn workflow_output_contains_trace_id_in_both_locations() {
5141        let yaml = r#"
5142id: trace-test
5143entry_node: classify
5144nodes:
5145  - id: classify
5146    node_type:
5147      llm_call:
5148        model: gpt-4.1
5149    config:
5150      prompt: |
5151        Classify this email into exactly one category:
5152        {{ input.email_text }}
5153"#;
5154
5155        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5156        let output = run_workflow_yaml(&workflow, &json!({"email_text":"hello"}), &MockExecutor)
5157            .await
5158            .expect("workflow should execute");
5159
5160        let trace_id = output
5161            .trace_id
5162            .as_deref()
5163            .expect("trace_id should be present");
5164        assert!(!trace_id.is_empty());
5165        assert_eq!(
5166            output.metadata.as_ref().and_then(|value| {
5167                value
5168                    .get("telemetry")
5169                    .and_then(|telemetry| telemetry.get("trace_id"))
5170                    .and_then(Value::as_str)
5171            }),
5172            Some(trace_id)
5173        );
5174    }
5175
5176    #[tokio::test]
5177    async fn workflow_run_options_use_explicit_trace_id_and_payload_mode() {
5178        let yaml = r#"
5179id: trace-options-test
5180entry_node: classify
5181nodes:
5182  - id: classify
5183    node_type:
5184      llm_call:
5185        model: gpt-4.1
5186    config:
5187      prompt: |
5188        Classify this email into exactly one category:
5189        {{ input.email_text }}
5190"#;
5191
5192        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5193        let options = YamlWorkflowRunOptions {
5194            telemetry: YamlWorkflowTelemetryConfig {
5195                payload_mode: YamlWorkflowPayloadMode::RedactedPayload,
5196                ..YamlWorkflowTelemetryConfig::default()
5197            },
5198            trace: YamlWorkflowTraceOptions {
5199                context: Some(YamlWorkflowTraceContextInput {
5200                    trace_id: Some("trace-fixed-123".to_string()),
5201                    traceparent: Some("00-trace-fixed-123-span-1-01".to_string()),
5202                    ..YamlWorkflowTraceContextInput::default()
5203                }),
5204                tenant: YamlWorkflowTraceTenantContext {
5205                    conversation_id: Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42".to_string()),
5206                    ..YamlWorkflowTraceTenantContext::default()
5207                },
5208            },
5209        };
5210
5211        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5212            &workflow,
5213            &json!({"email_text":"hello"}),
5214            &MockExecutor,
5215            None,
5216            None,
5217            &options,
5218        )
5219        .await
5220        .expect("workflow should execute");
5221
5222        assert_eq!(output.trace_id.as_deref(), Some("trace-fixed-123"));
5223        assert_eq!(
5224            output
5225                .metadata
5226                .as_ref()
5227                .and_then(|value| value.get("telemetry"))
5228                .and_then(|telemetry| telemetry.get("payload_mode"))
5229                .and_then(Value::as_str),
5230            Some("redacted_payload")
5231        );
5232        assert_eq!(
5233            output
5234                .metadata
5235                .as_ref()
5236                .and_then(|value| value.get("trace"))
5237                .and_then(|trace| trace.get("tenant"))
5238                .and_then(|tenant| tenant.get("conversation_id"))
5239                .and_then(Value::as_str),
5240            Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42")
5241        );
5242    }
5243
5244    #[test]
5245    fn streamed_payload_parser_extracts_last_json_object() {
5246        let raw = r#"{"state":"missing_scenario","reason":"ok"}
5247
5248extra reasoning text
5249
5250{"state":"ready","reason":"final"}"#;
5251
5252        let resolved = parse_streamed_structured_payload(raw, false)
5253            .expect("parser should extract final JSON object");
5254        assert_eq!(resolved.payload["state"], "ready");
5255        assert!(resolved.heal_confidence.is_none());
5256    }
5257
5258    #[test]
5259    fn streamed_payload_parser_handles_unbalanced_reasoning_before_json() {
5260        let raw = "reasoning text with unmatched { braces and thoughts\n{\"state\":\"ready\",\"reason\":\"final\"}";
5261
5262        let resolved = parse_streamed_structured_payload(raw, false)
5263            .expect("parser should recover final structured JSON object");
5264        assert_eq!(resolved.payload["state"], "ready");
5265    }
5266
5267    #[test]
5268    fn streamed_payload_parser_handles_markdown_with_heal() {
5269        let raw = r#"Some preface
5270```json
5271{
5272  "state": "missing_scenario",
5273  "reason": "Need more details"
5274}
5275```
5276Some trailing explanation"#;
5277
5278        let resolved = parse_streamed_structured_payload(raw, true)
5279            .expect("heal path should parse JSON block");
5280        assert_eq!(resolved.payload["state"], "missing_scenario");
5281        assert!(resolved.heal_confidence.is_some());
5282    }
5283
5284    #[test]
5285    fn streamed_payload_parser_errors_when_no_json_candidate_exists() {
5286        let raw = "No JSON in this streamed output";
5287        let error = parse_streamed_structured_payload(raw, false)
5288            .expect_err("strict stream parse should fail without JSON candidate");
5289        assert!(error.contains("no JSON object candidate found"));
5290    }
5291
5292    #[test]
5293    fn include_raw_stream_debug_events_defaults_to_false() {
5294        let _guard = stream_debug_env_lock()
5295            .lock()
5296            .expect("stream debug env lock should not be poisoned");
5297        std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
5298        assert!(!include_raw_stream_debug_events());
5299    }
5300
5301    #[test]
5302    fn include_raw_stream_debug_events_accepts_truthy_values() {
5303        let _guard = stream_debug_env_lock()
5304            .lock()
5305            .expect("stream debug env lock should not be poisoned");
5306        std::env::set_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW", "true");
5307        assert!(include_raw_stream_debug_events());
5308        std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
5309    }
5310
5311    #[test]
5312    fn structured_json_delta_filter_strips_reasoning_prefix_and_suffix() {
5313        let mut filter = StructuredJsonDeltaFilter::default();
5314        let chunks = vec![
5315            "I will think first... ",
5316            "{\"state\":\"missing_scenario\",",
5317            "\"reason\":\"Need more details\"}",
5318            " additional commentary",
5319        ];
5320
5321        let filtered = chunks
5322            .into_iter()
5323            .filter_map(|chunk| filter.split(chunk).0)
5324            .collect::<String>();
5325
5326        assert_eq!(
5327            filtered,
5328            "{\"state\":\"missing_scenario\",\"reason\":\"Need more details\"}"
5329        );
5330    }
5331
5332    #[test]
5333    fn structured_json_delta_filter_handles_braces_inside_strings() {
5334        let mut filter = StructuredJsonDeltaFilter::default();
5335        let chunks = vec![
5336            "preface ",
5337            "{\"reason\":\"brace } in text\",\"state\":\"ok\"}",
5338            " trailing",
5339        ];
5340
5341        let filtered = chunks
5342            .into_iter()
5343            .filter_map(|chunk| filter.split(chunk).0)
5344            .collect::<String>();
5345
5346        assert_eq!(
5347            filtered,
5348            "{\"reason\":\"brace } in text\",\"state\":\"ok\"}"
5349        );
5350    }
5351
5352    #[test]
5353    fn render_json_object_as_text_converts_top_level_fields() {
5354        let rendered =
5355            render_json_object_as_text(r#"{"question":"q","confidence":0.8,"nested":{"a":1}}"#);
5356        let lines: std::collections::HashSet<&str> = rendered.lines().collect();
5357
5358        assert_eq!(lines.len(), 3);
5359        assert!(lines.contains("question: q"));
5360        assert!(lines.contains("confidence: 0.8"));
5361        assert!(lines.contains("nested: {\"a\":1}"));
5362    }
5363
5364    #[test]
5365    fn stream_json_as_text_formatter_emits_once_when_complete() {
5366        let mut formatter = StreamJsonAsTextFormatter::default();
5367        formatter.push("{\"question\":\"hello\"}");
5368
5369        let first = formatter.emit_if_ready(true);
5370        let second = formatter.emit_if_ready(true);
5371
5372        assert_eq!(first, Some("question: hello".to_string()));
5373        assert_eq!(second, None);
5374    }
5375
5376    #[test]
5377    fn rewrite_yaml_condition_preserves_output_prefix_in_field_names() {
5378        let expr = "$.nodes.classify.output.output_total == 1";
5379        let rewritten = rewrite_yaml_condition_to_ir(expr);
5380        assert_eq!(rewritten, "$.node_outputs.classify.output_total == 1");
5381    }
5382
5383    #[tokio::test]
5384    async fn validates_workflow_input_before_ir_runtime_path() {
5385        let yaml = r#"
5386id: chat-workflow
5387entry_node: classify
5388nodes:
5389  - id: classify
5390    node_type:
5391      llm_call:
5392        model: gpt-4.1
5393    config:
5394      prompt: |
5395        classify
5396"#;
5397
5398        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5399        let err = run_workflow_yaml(&workflow, &json!("not-an-object"), &MockExecutor)
5400            .await
5401            .expect_err("non-object input should fail before execution");
5402
5403        assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
5404    }
5405
5406    #[test]
5407    fn interpolate_template_supports_dollar_prefixed_paths() {
5408        let context = json!({
5409            "input": {
5410                "email_text": "hello"
5411            }
5412        });
5413
5414        let rendered = interpolate_template("value={{ $.input.email_text }}", &context);
5415        assert_eq!(rendered, "value=hello");
5416    }
5417}