Skip to main content

simple_agents_workflow/
yaml_runner.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::time::Instant;
4
5use async_trait::async_trait;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use simple_agent_type::message::{Message, Role};
10use simple_agent_type::request::CompletionRequest;
11use simple_agent_type::response::FinishReason;
12use simple_agent_type::tool::{
13    ToolCall, ToolChoice, ToolChoiceMode, ToolChoiceTool, ToolDefinition, ToolType,
14};
15use simple_agents_core::{
16    CompletionMode, CompletionOptions, CompletionOutcome, SimpleAgentsClient,
17};
18use simple_agents_healing::JsonishParser;
19use thiserror::Error;
20
21use crate::ir::{Node, NodeKind, RouterRoute, WorkflowDefinition, WORKFLOW_IR_V0};
22use crate::observability::tracing::{
23    flush_workflow_tracer, workflow_tracer, SpanKind, TraceContext, WorkflowSpan,
24};
25use crate::runtime::{
26    LlmExecutionError, LlmExecutionInput, LlmExecutionOutput, LlmExecutor, ToolExecutionError,
27    ToolExecutionInput, ToolExecutor, WorkflowRuntime, WorkflowRuntimeError,
28    WorkflowRuntimeOptions,
29};
30use crate::validation::validate_and_normalize;
31use crate::visualize::workflow_to_mermaid;
32
33mod runner;
34pub use runner::WorkflowRunner;
35mod api;
36mod client_executor;
37mod context;
38mod execute;
39mod globals;
40mod llm_tools;
41mod validation;
42pub use api::{
43    run_email_workflow_yaml, run_email_workflow_yaml_file,
44    run_email_workflow_yaml_file_with_client,
45    run_email_workflow_yaml_file_with_client_and_custom_worker,
46    run_email_workflow_yaml_file_with_client_and_custom_worker_and_events,
47    run_email_workflow_yaml_with_client, run_email_workflow_yaml_with_client_and_custom_worker,
48    run_email_workflow_yaml_with_client_and_custom_worker_and_events,
49    run_email_workflow_yaml_with_custom_worker,
50    run_email_workflow_yaml_with_custom_worker_and_events, run_workflow_yaml,
51    run_workflow_yaml_file, run_workflow_yaml_file_with_client,
52    run_workflow_yaml_file_with_client_and_custom_worker,
53    run_workflow_yaml_file_with_client_and_custom_worker_and_events,
54    run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options,
55    run_workflow_yaml_with_client, run_workflow_yaml_with_client_and_custom_worker,
56    run_workflow_yaml_with_client_and_custom_worker_and_events,
57    run_workflow_yaml_with_custom_worker, run_workflow_yaml_with_custom_worker_and_events,
58};
59use client_executor::BorrowedClientExecutor;
60use context::{
61    build_yaml_context_from_ir_scope, collect_template_bindings, evaluate_switch_condition,
62    interpolate_template, json_type_name, parse_messages_from_context, resolve_path,
63};
64use globals::{apply_set_globals, apply_update_globals};
65use llm_tools::{
66    default_llm_output_schema, llm_output_schema_for_node, normalize_llm_tools,
67    normalize_tool_choice,
68};
69pub use validation::verify_yaml_workflow;
70
71const YAML_START_NODE_ID: &str = "__yaml_start";
72const YAML_LLM_TOOL_ID: &str = "__yaml_llm_call";
73const MAX_WORKFLOW_YAML_BYTES: u64 = 1024 * 1024;
74const MAX_WORKFLOW_YAML_DEPTH: usize = 64;
75
76static TRACE_ID_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
77
78#[derive(Debug, Clone, PartialEq, Serialize)]
79pub struct YamlStepTiming {
80    pub node_id: String,
81    pub node_kind: String,
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub model_name: Option<String>,
84    pub elapsed_ms: u128,
85    #[serde(skip_serializing_if = "Option::is_none")]
86    pub prompt_tokens: Option<u32>,
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub completion_tokens: Option<u32>,
89    #[serde(skip_serializing_if = "Option::is_none")]
90    pub total_tokens: Option<u32>,
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub reasoning_tokens: Option<u32>,
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub tokens_per_second: Option<f64>,
95}
96
97#[derive(Debug, Clone, PartialEq, Serialize)]
98pub struct YamlLlmNodeMetrics {
99    pub elapsed_ms: u128,
100    pub prompt_tokens: u32,
101    pub completion_tokens: u32,
102    pub total_tokens: u32,
103    #[serde(skip_serializing_if = "Option::is_none")]
104    pub reasoning_tokens: Option<u32>,
105    pub tokens_per_second: f64,
106}
107
108#[derive(Debug, Clone, PartialEq, Serialize)]
109pub struct YamlWorkflowRunOutput {
110    pub workflow_id: String,
111    pub entry_node: String,
112    pub email_text: String,
113    pub trace: Vec<String>,
114    pub outputs: BTreeMap<String, Value>,
115    pub terminal_node: String,
116    pub terminal_output: Option<Value>,
117    pub step_timings: Vec<YamlStepTiming>,
118    pub llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics>,
119    pub llm_node_models: BTreeMap<String, String>,
120    pub total_elapsed_ms: u128,
121    #[serde(skip_serializing_if = "Option::is_none")]
122    pub ttft_ms: Option<u128>,
123    pub total_input_tokens: u64,
124    pub total_output_tokens: u64,
125    pub total_tokens: u64,
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub total_reasoning_tokens: Option<u64>,
128    pub tokens_per_second: f64,
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub trace_id: Option<String>,
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub metadata: Option<Value>,
133}
134
135#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
136#[serde(rename_all = "snake_case")]
137pub enum YamlWorkflowPayloadMode {
138    #[default]
139    FullPayload,
140    RedactedPayload,
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
144#[serde(rename_all = "snake_case")]
145pub enum YamlToolTraceMode {
146    #[default]
147    Full,
148    Redacted,
149    Off,
150}
151
152#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
153pub struct YamlWorkflowTraceContextInput {
154    #[serde(default)]
155    pub trace_id: Option<String>,
156    #[serde(default)]
157    pub span_id: Option<String>,
158    #[serde(default)]
159    pub parent_span_id: Option<String>,
160    #[serde(default)]
161    pub traceparent: Option<String>,
162    #[serde(default)]
163    pub tracestate: Option<String>,
164    #[serde(default)]
165    pub baggage: BTreeMap<String, String>,
166}
167
168#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
169pub struct YamlWorkflowTraceTenantContext {
170    #[serde(default)]
171    pub workspace_id: Option<String>,
172    #[serde(default)]
173    pub user_id: Option<String>,
174    #[serde(default)]
175    pub conversation_id: Option<String>,
176    #[serde(default)]
177    pub request_id: Option<String>,
178    #[serde(default)]
179    pub run_id: Option<String>,
180}
181
182#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
183pub struct YamlWorkflowTelemetryConfig {
184    #[serde(default = "default_true")]
185    pub enabled: bool,
186    #[serde(default = "default_true")]
187    pub nerdstats: bool,
188    #[serde(default = "default_sample_rate")]
189    pub sample_rate: f32,
190    #[serde(default)]
191    pub payload_mode: YamlWorkflowPayloadMode,
192    #[serde(default = "default_retention_days")]
193    pub retention_days: u32,
194    #[serde(default = "default_true")]
195    pub multi_tenant: bool,
196    #[serde(default)]
197    pub tool_trace_mode: YamlToolTraceMode,
198}
199
200impl Default for YamlWorkflowTelemetryConfig {
201    fn default() -> Self {
202        Self {
203            enabled: true,
204            nerdstats: true,
205            sample_rate: 1.0,
206            payload_mode: YamlWorkflowPayloadMode::FullPayload,
207            retention_days: 30,
208            multi_tenant: true,
209            tool_trace_mode: YamlToolTraceMode::Full,
210        }
211    }
212}
213
214#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
215pub struct YamlWorkflowTraceOptions {
216    #[serde(default)]
217    pub context: Option<YamlWorkflowTraceContextInput>,
218    #[serde(default)]
219    pub tenant: YamlWorkflowTraceTenantContext,
220}
221
222#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
223pub struct YamlWorkflowRunOptions {
224    #[serde(default)]
225    pub telemetry: YamlWorkflowTelemetryConfig,
226    #[serde(default)]
227    pub trace: YamlWorkflowTraceOptions,
228    #[serde(default)]
229    pub model: Option<String>,
230}
231
232#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
233pub struct YamlLlmTokenUsage {
234    pub prompt_tokens: u32,
235    pub completion_tokens: u32,
236    pub total_tokens: u32,
237    pub reasoning_tokens: Option<u32>,
238}
239
240#[derive(Debug, Clone, PartialEq, Serialize)]
241pub struct YamlLlmExecutionResult {
242    pub payload: Value,
243    pub usage: Option<YamlLlmTokenUsage>,
244    pub ttft_ms: Option<u128>,
245    pub tool_calls: Vec<YamlToolCallTrace>,
246}
247
248#[derive(Debug, Clone, PartialEq, Serialize)]
249pub struct YamlToolCallTrace {
250    pub id: String,
251    pub name: String,
252    pub arguments: Value,
253    pub output: Option<Value>,
254    pub status: String,
255    pub elapsed_ms: u128,
256    pub error: Option<String>,
257}
258
259#[derive(Debug, Clone, Default)]
260struct YamlTokenTotals {
261    input_tokens: u64,
262    output_tokens: u64,
263    total_tokens: u64,
264    reasoning_tokens: Option<u64>,
265}
266
267impl YamlTokenTotals {
268    fn add_usage(&mut self, usage: &YamlLlmTokenUsage) {
269        self.input_tokens += u64::from(usage.prompt_tokens);
270        self.output_tokens += u64::from(usage.completion_tokens);
271        self.total_tokens += u64::from(usage.total_tokens);
272
273        if let Some(reasoning_tokens) = usage.reasoning_tokens {
274            let next = self.reasoning_tokens.unwrap_or(0) + u64::from(reasoning_tokens);
275            self.reasoning_tokens = Some(next);
276        }
277    }
278
279    fn tokens_per_second(&self, elapsed_ms: u128) -> f64 {
280        if elapsed_ms == 0 {
281            return 0.0;
282        }
283        round_two_decimals((self.output_tokens as f64) * 1000.0 / (elapsed_ms as f64))
284    }
285}
286
287fn round_two_decimals(value: f64) -> f64 {
288    (value * 100.0).round() / 100.0
289}
290
291fn completion_tokens_per_second(completion_tokens: u32, elapsed_ms: u128) -> f64 {
292    if elapsed_ms == 0 {
293        return 0.0;
294    }
295    round_two_decimals((completion_tokens as f64) * 1000.0 / (elapsed_ms as f64))
296}
297
298fn resolve_requested_model(run_model_override: Option<&str>, node_model: &str) -> String {
299    run_model_override
300        .and_then(|model| {
301            let trimmed = model.trim();
302            if trimmed.is_empty() {
303                None
304            } else {
305                Some(trimmed.to_string())
306            }
307        })
308        .unwrap_or_else(|| node_model.to_string())
309}
310
311fn default_true() -> bool {
312    true
313}
314
315fn default_sample_rate() -> f32 {
316    1.0
317}
318
319fn default_retention_days() -> u32 {
320    30
321}
322
323fn validate_sample_rate(sample_rate: f32) -> Result<(), YamlWorkflowRunError> {
324    if sample_rate.is_finite() && (0.0..=1.0).contains(&sample_rate) {
325        return Ok(());
326    }
327
328    Err(YamlWorkflowRunError::InvalidInput {
329        message: format!(
330            "telemetry.sample_rate must be between 0.0 and 1.0 inclusive; received {sample_rate}"
331        ),
332    })
333}
334
335fn should_sample_trace(trace_id: &str, sample_rate: f32) -> bool {
336    if sample_rate >= 1.0 {
337        return true;
338    }
339    if sample_rate <= 0.0 {
340        return false;
341    }
342
343    let mut hash: u64 = 0xcbf29ce484222325;
344    for byte in trace_id.as_bytes() {
345        hash ^= u64::from(*byte);
346        hash = hash.wrapping_mul(0x100000001b3);
347    }
348
349    let ratio = (hash as f64) / (u64::MAX as f64);
350    ratio < (sample_rate as f64)
351}
352
353fn trace_id_from_traceparent(traceparent: &str) -> Option<String> {
354    let mut parts = traceparent.split('-');
355    let version = parts.next()?;
356    let trace_id = parts.next()?;
357    let _span_id = parts.next()?;
358    let _flags = parts.next()?;
359    if parts.next().is_some() {
360        return None;
361    }
362
363    if version.len() != 2 || trace_id.len() != 32 {
364        return None;
365    }
366
367    if !version.chars().all(|ch| ch.is_ascii_hexdigit()) {
368        return None;
369    }
370    if !trace_id.chars().all(|ch| ch.is_ascii_hexdigit()) {
371        return None;
372    }
373    if trace_id.chars().all(|ch| ch == '0') {
374        return None;
375    }
376
377    Some(trace_id.to_ascii_lowercase())
378}
379
380#[derive(Debug, Clone, Copy, PartialEq, Eq)]
381enum TraceIdSource {
382    Disabled,
383    ExplicitTraceId,
384    ParentTraceId,
385    Traceparent,
386    ParentTraceparent,
387    Generated,
388}
389
390#[derive(Debug, Clone)]
391struct ResolvedTelemetryContext {
392    trace_id: Option<String>,
393    sampled: bool,
394    trace_id_source: TraceIdSource,
395}
396
397fn resolve_run_trace_id_with_source(
398    options: &YamlWorkflowRunOptions,
399    parent_trace_context: Option<&TraceContext>,
400) -> (Option<String>, TraceIdSource) {
401    if !options.telemetry.enabled {
402        return (None, TraceIdSource::Disabled);
403    }
404
405    if let Some(trace_id) = options
406        .trace
407        .context
408        .as_ref()
409        .and_then(|context| context.trace_id.clone())
410    {
411        return (Some(trace_id), TraceIdSource::ExplicitTraceId);
412    }
413
414    if let Some(trace_id) = parent_trace_context.and_then(|context| context.trace_id.clone()) {
415        return (Some(trace_id), TraceIdSource::ParentTraceId);
416    }
417
418    if let Some(trace_id) = options
419        .trace
420        .context
421        .as_ref()
422        .and_then(|context| context.traceparent.as_deref())
423        .and_then(trace_id_from_traceparent)
424    {
425        return (Some(trace_id), TraceIdSource::Traceparent);
426    }
427
428    if let Some(trace_id) = parent_trace_context
429        .and_then(|context| context.traceparent.as_deref())
430        .and_then(trace_id_from_traceparent)
431    {
432        return (Some(trace_id), TraceIdSource::ParentTraceparent);
433    }
434
435    (Some(generate_trace_id()), TraceIdSource::Generated)
436}
437
438fn resolve_telemetry_context(
439    options: &YamlWorkflowRunOptions,
440    parent_trace_context: Option<&TraceContext>,
441) -> ResolvedTelemetryContext {
442    let (trace_id, trace_id_source) =
443        resolve_run_trace_id_with_source(options, parent_trace_context);
444    let sampled = trace_id
445        .as_deref()
446        .map(|value| should_sample_trace(value, options.telemetry.sample_rate))
447        .unwrap_or(false);
448
449    ResolvedTelemetryContext {
450        trace_id,
451        sampled,
452        trace_id_source,
453    }
454}
455
456fn generate_trace_id() -> String {
457    use std::time::{SystemTime, UNIX_EPOCH};
458
459    let now_nanos = SystemTime::now()
460        .duration_since(UNIX_EPOCH)
461        .map(|duration| duration.as_nanos())
462        .unwrap_or(0);
463    let sequence = u128::from(TRACE_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed));
464    format!("{:032x}", now_nanos ^ sequence)
465}
466
467fn workflow_metadata_with_trace(
468    options: &YamlWorkflowRunOptions,
469    trace_id: &str,
470    sampled: bool,
471    trace_id_source: TraceIdSource,
472) -> Value {
473    json!({
474        "telemetry": {
475            "trace_id": trace_id,
476            "trace_id_source": match trace_id_source {
477                TraceIdSource::Disabled => "disabled",
478                TraceIdSource::ExplicitTraceId => "explicit_trace_id",
479                TraceIdSource::ParentTraceId => "parent_trace_id",
480                TraceIdSource::Traceparent => "traceparent",
481                TraceIdSource::ParentTraceparent => "parent_traceparent",
482                TraceIdSource::Generated => "generated",
483            },
484            "enabled": options.telemetry.enabled,
485            "sampled": sampled,
486            "nerdstats": options.telemetry.nerdstats,
487            "sample_rate": options.telemetry.sample_rate,
488            "payload_mode": match options.telemetry.payload_mode {
489                YamlWorkflowPayloadMode::FullPayload => "full_payload",
490                YamlWorkflowPayloadMode::RedactedPayload => "redacted_payload",
491            },
492            "retention_days": options.telemetry.retention_days,
493            "multi_tenant": options.telemetry.multi_tenant,
494            "tool_trace_mode": match options.telemetry.tool_trace_mode {
495                YamlToolTraceMode::Full => "full",
496                YamlToolTraceMode::Redacted => "redacted",
497                YamlToolTraceMode::Off => "off",
498            },
499        },
500        "trace": {
501            "tenant": {
502                "workspace_id": options.trace.tenant.workspace_id,
503                "user_id": options.trace.tenant.user_id,
504                "conversation_id": options.trace.tenant.conversation_id,
505                "request_id": options.trace.tenant.request_id,
506                "run_id": options.trace.tenant.run_id,
507            }
508        },
509    })
510}
511
512fn apply_trace_identity_attributes(span: &mut dyn WorkflowSpan, trace_id: Option<&str>) {
513    if let Some(value) = trace_id {
514        span.set_attribute("trace_id", value);
515    }
516}
517
518fn apply_trace_tenant_attributes_from_tenant(
519    span: &mut dyn WorkflowSpan,
520    tenant: &YamlWorkflowTraceTenantContext,
521) {
522    if let Some(workspace_id) = tenant.workspace_id.as_deref() {
523        span.set_attribute("tenant.workspace_id", workspace_id);
524    }
525    if let Some(user_id) = tenant.user_id.as_deref() {
526        span.set_attribute("tenant.user_id", user_id);
527        span.set_attribute("user.id", user_id);
528        span.set_attribute("langfuse.user.id", user_id);
529    }
530    if let Some(conversation_id) = tenant.conversation_id.as_deref() {
531        span.set_attribute("tenant.conversation_id", conversation_id);
532        span.set_attribute("session.id", conversation_id);
533        span.set_attribute("langfuse.session.id", conversation_id);
534    }
535    if let Some(request_id) = tenant.request_id.as_deref() {
536        span.set_attribute("tenant.request_id", request_id);
537    }
538    if let Some(run_id) = tenant.run_id.as_deref() {
539        span.set_attribute("tenant.run_id", run_id);
540    }
541}
542
543fn apply_trace_tenant_attributes(span: &mut dyn WorkflowSpan, options: &YamlWorkflowRunOptions) {
544    apply_trace_tenant_attributes_from_tenant(span, &options.trace.tenant);
545}
546
547fn workflow_nerdstats(output: &YamlWorkflowRunOutput) -> Value {
548    let llm_nodes_without_usage: Vec<String> = output
549        .step_timings
550        .iter()
551        .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
552        .map(|step| step.node_id.clone())
553        .collect();
554    let token_metrics_available = llm_nodes_without_usage.is_empty();
555    let token_metrics_source = if token_metrics_available {
556        "provider_usage"
557    } else {
558        "provider_stream_usage_unavailable"
559    };
560    let total_input_tokens = if token_metrics_available {
561        json!(output.total_input_tokens)
562    } else {
563        Value::Null
564    };
565    let total_output_tokens = if token_metrics_available {
566        json!(output.total_output_tokens)
567    } else {
568        Value::Null
569    };
570    let total_tokens = if token_metrics_available {
571        json!(output.total_tokens)
572    } else {
573        Value::Null
574    };
575    let total_reasoning_tokens = if token_metrics_available {
576        json!(output.total_reasoning_tokens)
577    } else {
578        Value::Null
579    };
580    let tokens_per_second = if token_metrics_available {
581        json!(output.tokens_per_second)
582    } else {
583        Value::Null
584    };
585
586    json!({
587        "workflow_id": output.workflow_id,
588        "terminal_node": output.terminal_node,
589        "total_elapsed_ms": output.total_elapsed_ms,
590        "ttft_ms": output.ttft_ms,
591        "step_details": output.step_timings,
592        "total_input_tokens": total_input_tokens,
593        "total_output_tokens": total_output_tokens,
594        "total_tokens": total_tokens,
595        "total_reasoning_tokens": total_reasoning_tokens,
596        "tokens_per_second": tokens_per_second,
597        "trace_id": output.trace_id,
598        "token_metrics_available": token_metrics_available,
599        "token_metrics_source": token_metrics_source,
600        "llm_nodes_without_usage": llm_nodes_without_usage,
601    })
602}
603
604fn apply_langfuse_nerdstats_attributes(
605    span: &mut dyn WorkflowSpan,
606    output: &YamlWorkflowRunOutput,
607    enabled: bool,
608) {
609    if !enabled {
610        return;
611    }
612
613    let nerdstats = workflow_nerdstats(output);
614    let nerdstats_json = nerdstats.to_string();
615    span.set_attribute("langfuse.trace.metadata.nerdstats", nerdstats_json.as_str());
616
617    span.set_attribute(
618        "langfuse.trace.metadata.nerdstats.workflow_id",
619        output.workflow_id.as_str(),
620    );
621    span.set_attribute(
622        "langfuse.trace.metadata.nerdstats.terminal_node",
623        output.terminal_node.as_str(),
624    );
625    span.set_attribute(
626        "langfuse.trace.metadata.nerdstats.total_elapsed_ms",
627        output.total_elapsed_ms.to_string().as_str(),
628    );
629    span.set_attribute(
630        "langfuse.trace.metadata.nerdstats.step_details_count",
631        output.step_timings.len().to_string().as_str(),
632    );
633    span.set_attribute(
634        "langfuse.trace.metadata.nerdstats.total_input_tokens",
635        output.total_input_tokens.to_string().as_str(),
636    );
637    span.set_attribute(
638        "langfuse.trace.metadata.nerdstats.total_output_tokens",
639        output.total_output_tokens.to_string().as_str(),
640    );
641    span.set_attribute(
642        "langfuse.trace.metadata.nerdstats.total_tokens",
643        output.total_tokens.to_string().as_str(),
644    );
645    span.set_attribute(
646        "langfuse.trace.metadata.nerdstats.tokens_per_second",
647        output.tokens_per_second.to_string().as_str(),
648    );
649
650    if let Some(ttft_ms) = output.ttft_ms {
651        span.set_attribute(
652            "langfuse.trace.metadata.nerdstats.ttft_ms",
653            ttft_ms.to_string().as_str(),
654        );
655    }
656
657    if let Some(reasoning_tokens) = output.total_reasoning_tokens {
658        span.set_attribute(
659            "langfuse.trace.metadata.nerdstats.total_reasoning_tokens",
660            reasoning_tokens.to_string().as_str(),
661        );
662    }
663
664    let llm_nodes_without_usage_count = output
665        .step_timings
666        .iter()
667        .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
668        .count();
669    let token_metrics_available = llm_nodes_without_usage_count == 0;
670    span.set_attribute(
671        "langfuse.trace.metadata.nerdstats.token_metrics_available",
672        if token_metrics_available {
673            "true"
674        } else {
675            "false"
676        },
677    );
678    span.set_attribute(
679        "langfuse.trace.metadata.nerdstats.token_metrics_source",
680        if token_metrics_available {
681            "provider_usage"
682        } else {
683            "provider_stream_usage_unavailable"
684        },
685    );
686    span.set_attribute(
687        "langfuse.trace.metadata.nerdstats.llm_nodes_without_usage_count",
688        llm_nodes_without_usage_count.to_string().as_str(),
689    );
690}
691
692fn apply_langfuse_trace_input_output_attributes(
693    span: &mut dyn WorkflowSpan,
694    workflow_input: &Value,
695    output: &YamlWorkflowRunOutput,
696    payload_mode: YamlWorkflowPayloadMode,
697) {
698    let trace_input = payload_for_span(payload_mode, workflow_input);
699    span.set_attribute("langfuse.trace.input", trace_input.as_str());
700
701    if let Some(terminal_output) = output.terminal_output.as_ref() {
702        let trace_output = payload_for_span(payload_mode, terminal_output);
703        span.set_attribute("langfuse.trace.output", trace_output.as_str());
704    }
705
706    let usage_details = json!({
707        "input": output.total_input_tokens,
708        "output": output.total_output_tokens,
709        "total": output.total_tokens,
710        "reasoning": output.total_reasoning_tokens,
711    })
712    .to_string();
713    span.set_attribute(
714        "langfuse.trace.metadata.usage_details",
715        usage_details.as_str(),
716    );
717}
718
719fn apply_langfuse_observation_usage_attributes(
720    span: &mut dyn WorkflowSpan,
721    usage: &YamlLlmTokenUsage,
722) {
723    let usage_details = json!({
724        "input": usage.prompt_tokens,
725        "output": usage.completion_tokens,
726        "total": usage.total_tokens,
727        "reasoning": usage.reasoning_tokens,
728    })
729    .to_string();
730    span.set_attribute("langfuse.observation.usage_details", usage_details.as_str());
731    span.set_attribute(
732        "gen_ai.usage.input_tokens",
733        usage.prompt_tokens.to_string().as_str(),
734    );
735    span.set_attribute(
736        "gen_ai.usage.output_tokens",
737        usage.completion_tokens.to_string().as_str(),
738    );
739    span.set_attribute(
740        "gen_ai.usage.total_tokens",
741        usage.total_tokens.to_string().as_str(),
742    );
743    if let Some(reasoning_tokens) = usage.reasoning_tokens {
744        span.set_attribute(
745            "gen_ai.usage.reasoning_tokens",
746            reasoning_tokens.to_string().as_str(),
747        );
748    }
749}
750
751fn payload_for_span(mode: YamlWorkflowPayloadMode, payload: &Value) -> String {
752    match mode {
753        YamlWorkflowPayloadMode::FullPayload => payload.to_string(),
754        YamlWorkflowPayloadMode::RedactedPayload => json!({
755            "redacted": true,
756            "value_type": match payload {
757                Value::Null => "null",
758                Value::Bool(_) => "bool",
759                Value::Number(_) => "number",
760                Value::String(_) => "string",
761                Value::Array(_) => "array",
762                Value::Object(_) => "object",
763            }
764        })
765        .to_string(),
766    }
767}
768
769fn payload_for_tool_trace(mode: YamlToolTraceMode, payload: &Value) -> Value {
770    match mode {
771        YamlToolTraceMode::Full => payload.clone(),
772        YamlToolTraceMode::Redacted => json!({
773            "redacted": true,
774            "value_type": json_type_name(payload),
775        }),
776        YamlToolTraceMode::Off => Value::Null,
777    }
778}
779
780fn validate_json_schema(schema: &Value) -> Result<(), String> {
781    jsonschema::JSONSchema::compile(schema)
782        .map(|_| ())
783        .map_err(|error| format!("invalid JSON schema: {error}"))
784}
785
786fn validate_schema_instance(schema: &Value, instance: &Value) -> Result<(), String> {
787    let validator = jsonschema::JSONSchema::compile(schema)
788        .map_err(|error| format!("invalid JSON schema: {error}"))?;
789    if let Err(errors) = validator.validate(instance) {
790        let message = errors
791            .into_iter()
792            .next()
793            .map(|error| error.to_string())
794            .unwrap_or_else(|| "unknown schema validation error".to_string());
795        return Err(format!("schema validation failed: {message}"));
796    }
797    Ok(())
798}
799
800fn schema_type(schema: &Value) -> Option<&str> {
801    schema.get("type").and_then(Value::as_str)
802}
803
804fn schema_expects_object(schema: &Value) -> bool {
805    schema_type(schema) == Some("object")
806}
807
808fn trace_context_from_options(options: &YamlWorkflowRunOptions) -> Option<TraceContext> {
809    options.trace.context.as_ref().map(|input| TraceContext {
810        trace_id: input.trace_id.clone(),
811        span_id: input.span_id.clone(),
812        parent_span_id: input.parent_span_id.clone(),
813        traceparent: input.traceparent.clone(),
814        tracestate: input.tracestate.clone(),
815        baggage: input.baggage.clone(),
816    })
817}
818
819fn merged_trace_context_for_worker(
820    span_context: Option<&TraceContext>,
821    resolved_trace_id: Option<&str>,
822    options: &YamlWorkflowRunOptions,
823) -> TraceContext {
824    let input_context = options.trace.context.as_ref();
825    let baggage = if let Some(context) = span_context {
826        if !context.baggage.is_empty() {
827            context.baggage.clone()
828        } else {
829            input_context
830                .map(|value| value.baggage.clone())
831                .unwrap_or_default()
832        }
833    } else {
834        input_context
835            .map(|value| value.baggage.clone())
836            .unwrap_or_default()
837    };
838
839    TraceContext {
840        trace_id: span_context
841            .and_then(|context| context.trace_id.clone())
842            .or_else(|| resolved_trace_id.map(|value| value.to_string()))
843            .or_else(|| input_context.and_then(|context| context.trace_id.clone())),
844        span_id: span_context
845            .and_then(|context| context.span_id.clone())
846            .or_else(|| input_context.and_then(|context| context.span_id.clone())),
847        parent_span_id: span_context
848            .and_then(|context| context.parent_span_id.clone())
849            .or_else(|| input_context.and_then(|context| context.parent_span_id.clone())),
850        traceparent: span_context
851            .and_then(|context| context.traceparent.clone())
852            .or_else(|| input_context.and_then(|context| context.traceparent.clone())),
853        tracestate: span_context
854            .and_then(|context| context.tracestate.clone())
855            .or_else(|| input_context.and_then(|context| context.tracestate.clone())),
856        baggage,
857    }
858}
859
860fn custom_worker_context_with_trace(
861    context: &Value,
862    trace_context: &TraceContext,
863    tenant_context: &YamlWorkflowTraceTenantContext,
864) -> Value {
865    let mut context_with_trace = context.clone();
866    let Some(root) = context_with_trace.as_object_mut() else {
867        return context_with_trace;
868    };
869
870    root.insert(
871        "trace".to_string(),
872        json!({
873            "context": {
874                "trace_id": trace_context.trace_id,
875                "span_id": trace_context.span_id,
876                "parent_span_id": trace_context.parent_span_id,
877                "traceparent": trace_context.traceparent,
878                "tracestate": trace_context.tracestate,
879                "baggage": trace_context.baggage,
880            },
881            "tenant": {
882                "workspace_id": tenant_context.workspace_id,
883                "user_id": tenant_context.user_id,
884                "conversation_id": tenant_context.conversation_id,
885                "request_id": tenant_context.request_id,
886                "run_id": tenant_context.run_id,
887            }
888        }),
889    );
890
891    context_with_trace
892}
893
894fn include_raw_stream_debug_events() -> bool {
895    match std::env::var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW") {
896        Ok(value) => {
897            let normalized = value.trim().to_ascii_lowercase();
898            normalized == "1" || normalized == "true" || normalized == "yes" || normalized == "on"
899        }
900        Err(_) => false,
901    }
902}
903
904#[derive(Debug)]
905struct StreamedPayloadResolution {
906    payload: Value,
907    heal_confidence: Option<f32>,
908}
909
910#[derive(Debug, Default)]
911struct StreamJsonAsTextFormatter {
912    raw_json: String,
913    emitted: bool,
914}
915
916impl StreamJsonAsTextFormatter {
917    fn push(&mut self, chunk: &str) {
918        self.raw_json.push_str(chunk);
919    }
920
921    fn emit_if_ready(&mut self, complete: bool) -> Option<String> {
922        if self.emitted || !complete {
923            return None;
924        }
925        self.emitted = true;
926        Some(render_json_object_as_text(self.raw_json.as_str()))
927    }
928}
929
930fn render_json_object_as_text(raw_json: &str) -> String {
931    let value = match serde_json::from_str::<Value>(raw_json) {
932        Ok(value) => value,
933        Err(_) => return raw_json.to_string(),
934    };
935    let Some(object) = value.as_object() else {
936        return raw_json.to_string();
937    };
938
939    let mut lines = Vec::with_capacity(object.len());
940    for (key, value) in object {
941        let rendered = match value {
942            Value::String(text) => text.clone(),
943            _ => value.to_string(),
944        };
945        lines.push(format!("{key}: {rendered}"));
946    }
947    lines.join("\n")
948}
949
950#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
951#[serde(rename_all = "snake_case")]
952pub enum YamlWorkflowTokenKind {
953    Output,
954    Thinking,
955}
956
957#[derive(Debug, Default)]
958struct StructuredJsonDeltaFilter {
959    started: bool,
960    completed: bool,
961    depth: u32,
962    in_string: bool,
963    escape: bool,
964}
965
966impl StructuredJsonDeltaFilter {
967    fn split(&mut self, delta: &str) -> (Option<String>, Option<String>) {
968        if delta.is_empty() {
969            return (None, None);
970        }
971
972        let mut output = String::new();
973        let mut thinking = String::new();
974
975        for ch in delta.chars() {
976            if self.completed {
977                thinking.push(ch);
978                continue;
979            }
980
981            if !self.started {
982                if ch != '{' {
983                    thinking.push(ch);
984                    continue;
985                }
986                self.started = true;
987                self.depth = 1;
988                output.push(ch);
989                continue;
990            }
991
992            output.push(ch);
993            if self.in_string {
994                if self.escape {
995                    self.escape = false;
996                    continue;
997                }
998                if ch == '\\' {
999                    self.escape = true;
1000                    continue;
1001                }
1002                if ch == '"' {
1003                    self.in_string = false;
1004                }
1005                continue;
1006            }
1007
1008            match ch {
1009                '"' => self.in_string = true,
1010                '{' => self.depth = self.depth.saturating_add(1),
1011                '}' => {
1012                    if self.depth > 0 {
1013                        self.depth -= 1;
1014                    }
1015                    if self.depth == 0 {
1016                        self.completed = true;
1017                    }
1018                }
1019                _ => {}
1020            }
1021        }
1022
1023        let output = if output.is_empty() {
1024            None
1025        } else {
1026            Some(output)
1027        };
1028        let thinking = if thinking.is_empty() {
1029            None
1030        } else {
1031            Some(thinking)
1032        };
1033
1034        (output, thinking)
1035    }
1036
1037    fn completed(&self) -> bool {
1038        self.completed
1039    }
1040}
1041
1042fn extract_last_fenced_json_block(raw: &str) -> Option<&str> {
1043    let start = raw.rfind("```json")?;
1044    let remainder = &raw[start + "```json".len()..];
1045    let end = remainder.find("```")?;
1046    let candidate = remainder[..end].trim();
1047    if candidate.is_empty() {
1048        return None;
1049    }
1050    Some(candidate)
1051}
1052
1053fn extract_balanced_object_from(raw: &str, start_index: usize) -> Option<&str> {
1054    let mut depth = 0u32;
1055    let mut in_string = false;
1056    let mut escape = false;
1057
1058    for (relative_index, ch) in raw[start_index..].char_indices() {
1059        if in_string {
1060            if escape {
1061                escape = false;
1062                continue;
1063            }
1064            if ch == '\\' {
1065                escape = true;
1066                continue;
1067            }
1068            if ch == '"' {
1069                in_string = false;
1070            }
1071            continue;
1072        }
1073
1074        match ch {
1075            '"' => in_string = true,
1076            '{' => depth = depth.saturating_add(1),
1077            '}' => {
1078                if depth == 0 {
1079                    return None;
1080                }
1081                depth -= 1;
1082                if depth == 0 {
1083                    let end_index = start_index + relative_index + ch.len_utf8();
1084                    return Some(raw[start_index..end_index].trim());
1085                }
1086            }
1087            _ => {}
1088        }
1089    }
1090
1091    None
1092}
1093
1094fn extract_last_parsable_object(raw: &str) -> Option<&str> {
1095    let starts: Vec<usize> = raw
1096        .char_indices()
1097        .filter_map(|(index, ch)| if ch == '{' { Some(index) } else { None })
1098        .collect();
1099
1100    for start in starts.into_iter().rev() {
1101        let Some(candidate) = extract_balanced_object_from(raw, start) else {
1102            continue;
1103        };
1104        if serde_json::from_str::<Value>(candidate).is_ok() {
1105            return Some(candidate);
1106        }
1107    }
1108
1109    None
1110}
1111
1112fn resolve_structured_json_candidate(raw: &str) -> Option<&str> {
1113    extract_last_fenced_json_block(raw).or_else(|| extract_last_parsable_object(raw))
1114}
1115
1116fn parse_streamed_structured_payload(
1117    raw: &str,
1118    heal: bool,
1119) -> Result<StreamedPayloadResolution, String> {
1120    if !heal {
1121        if let Ok(payload) = serde_json::from_str::<Value>(raw) {
1122            return Ok(StreamedPayloadResolution {
1123                payload,
1124                heal_confidence: None,
1125            });
1126        }
1127
1128        let candidate = resolve_structured_json_candidate(raw).ok_or_else(|| {
1129            "failed to parse streamed structured completion JSON: no JSON object candidate found"
1130                .to_string()
1131        })?;
1132        let payload = serde_json::from_str::<Value>(candidate).map_err(|error| {
1133            format!(
1134                "failed to parse streamed structured completion JSON: {error}; candidate={candidate}"
1135            )
1136        })?;
1137        return Ok(StreamedPayloadResolution {
1138            payload,
1139            heal_confidence: None,
1140        });
1141    }
1142
1143    let candidate = resolve_structured_json_candidate(raw).unwrap_or(raw);
1144    let parser = JsonishParser::new();
1145    let healed = parser
1146        .parse(candidate)
1147        .map_err(|error| format!("failed to heal streamed structured completion JSON: {error}"))?;
1148
1149    Ok(StreamedPayloadResolution {
1150        payload: healed.value,
1151        heal_confidence: Some(healed.confidence),
1152    })
1153}
1154
1155#[derive(Debug, Clone, PartialEq, Serialize)]
1156pub struct YamlWorkflowEvent {
1157    pub event_type: String,
1158    pub node_id: Option<String>,
1159    pub step_id: Option<String>,
1160    pub node_kind: Option<String>,
1161    pub streamable: Option<bool>,
1162    pub message: Option<String>,
1163    pub delta: Option<String>,
1164    pub token_kind: Option<YamlWorkflowTokenKind>,
1165    pub is_terminal_node_token: Option<bool>,
1166    pub elapsed_ms: Option<u128>,
1167    pub metadata: Option<Value>,
1168}
1169
1170pub type WorkflowMessageRole = Role;
1171
1172#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1173pub struct WorkflowMessage {
1174    pub role: WorkflowMessageRole,
1175    pub content: String,
1176    #[serde(default)]
1177    pub name: Option<String>,
1178    #[serde(default, alias = "toolCallId")]
1179    pub tool_call_id: Option<String>,
1180}
1181
1182#[derive(Debug, Clone, PartialEq, Serialize)]
1183pub struct YamlTemplateBinding {
1184    pub index: usize,
1185    pub expression: String,
1186    pub source_path: String,
1187    pub resolved: Value,
1188    pub resolved_type: String,
1189    pub missing: bool,
1190}
1191
1192#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1193pub enum YamlWorkflowDiagnosticSeverity {
1194    Error,
1195    Warning,
1196}
1197
1198#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1199pub struct YamlWorkflowDiagnostic {
1200    pub node_id: Option<String>,
1201    pub code: String,
1202    pub severity: YamlWorkflowDiagnosticSeverity,
1203    pub message: String,
1204}
1205
1206#[derive(Debug, Error)]
1207pub enum YamlWorkflowRunError {
1208    #[error("failed to read workflow yaml '{path}': {source}")]
1209    Read {
1210        path: String,
1211        source: std::io::Error,
1212    },
1213    #[error("failed to parse workflow yaml '{path}': {source}")]
1214    Parse {
1215        path: String,
1216        source: serde_yaml::Error,
1217    },
1218    #[error("rejected workflow yaml '{path}': {reason}")]
1219    FileRejected { path: String, reason: String },
1220    #[error("workflow '{workflow_id}' has no nodes")]
1221    EmptyNodes { workflow_id: String },
1222    #[error("entry node '{entry_node}' does not exist")]
1223    MissingEntry { entry_node: String },
1224    #[error("unknown node id '{node_id}'")]
1225    MissingNode { node_id: String },
1226    #[error("unsupported node type in '{node_id}'")]
1227    UnsupportedNodeType { node_id: String },
1228    #[error("unsupported switch condition format: {condition}")]
1229    UnsupportedCondition { condition: String },
1230    #[error("switch node '{node_id}' has no valid next target")]
1231    InvalidSwitchTarget { node_id: String },
1232    #[error("llm returned non-object payload for node '{node_id}'")]
1233    LlmPayloadNotObject { node_id: String },
1234    #[error("custom worker handler '{handler}' is not supported")]
1235    UnsupportedCustomHandler { handler: String },
1236    #[error("llm execution failed for node '{node_id}': {message}")]
1237    Llm { node_id: String, message: String },
1238    #[error("custom worker execution failed for node '{node_id}': {message}")]
1239    CustomWorker { node_id: String, message: String },
1240    #[error("workflow validation failed with {diagnostics_count} error(s)")]
1241    Validation {
1242        diagnostics_count: usize,
1243        diagnostics: Vec<YamlWorkflowDiagnostic>,
1244    },
1245    #[error("invalid workflow input: {message}")]
1246    InvalidInput { message: String },
1247    #[error("ir runtime execution failed: {message}")]
1248    IrRuntime { message: String },
1249    #[error("workflow event stream cancelled: {message}")]
1250    EventSinkCancelled { message: String },
1251}
1252
1253pub trait YamlWorkflowEventSink: Send + Sync {
1254    fn emit(&self, event: &YamlWorkflowEvent);
1255
1256    fn is_cancelled(&self) -> bool {
1257        false
1258    }
1259}
1260
1261pub struct NoopYamlWorkflowEventSink;
1262
1263impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
1264    fn emit(&self, _event: &YamlWorkflowEvent) {}
1265}
1266
1267fn workflow_event_sink_cancelled_message() -> &'static str {
1268    "workflow event callback cancelled"
1269}
1270
1271fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
1272    event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
1273}
1274
1275#[derive(Debug, Clone, PartialEq, Eq, Error)]
1276pub enum YamlToIrError {
1277    #[error("entry node '{entry_node}' does not exist")]
1278    MissingEntry { entry_node: String },
1279    #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
1280    MultipleOutgoingEdge { node_id: String },
1281    #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
1282    UnsupportedNode { node_id: String, reason: String },
1283}
1284
1285/// Render a YAML workflow graph as Mermaid flowchart.
1286pub fn yaml_workflow_to_mermaid(workflow: &YamlWorkflow) -> String {
1287    if let Ok(ir) = yaml_workflow_to_ir(workflow) {
1288        return workflow_to_mermaid(&ir);
1289    }
1290
1291    yaml_workflow_to_mermaid_fallback(workflow)
1292}
1293
1294fn yaml_workflow_to_mermaid_fallback(workflow: &YamlWorkflow) -> String {
1295    let mut lines = Vec::new();
1296    lines.push("flowchart TD".to_string());
1297    let mut tool_node_ids: Vec<String> = Vec::new();
1298
1299    for node in &workflow.nodes {
1300        let label = format!("{}\\n({})", node.id, node.kind_name());
1301        lines.push(format!(
1302            "  {}[\"{}\"]",
1303            sanitize_mermaid_id(&node.id),
1304            escape_mermaid_label(label.as_str()),
1305        ));
1306
1307        if let Some(llm) = node.node_type.llm_call.as_ref() {
1308            for (idx, tool_name) in llm_tool_names(llm).into_iter().enumerate() {
1309                let tool_id = sanitize_mermaid_id(format!("{}__tool_{}", node.id, idx).as_str());
1310                lines.push(format!(
1311                    "  {}([\"{}\"])",
1312                    tool_id,
1313                    escape_mermaid_label(format!("tool: {tool_name}").as_str())
1314                ));
1315                lines.push(format!(
1316                    "  {} -.-> {}",
1317                    sanitize_mermaid_id(&node.id),
1318                    tool_id
1319                ));
1320                tool_node_ids.push(tool_id);
1321            }
1322        }
1323    }
1324
1325    let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1326
1327    for edge in &workflow.edges {
1328        emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1329    }
1330
1331    for node in &workflow.nodes {
1332        if let Some(switch) = node.node_type.switch.as_ref() {
1333            for branch in &switch.branches {
1334                emitted.insert((
1335                    node.id.clone(),
1336                    branch.condition.clone(),
1337                    branch.target.clone(),
1338                ));
1339            }
1340            emitted.insert((
1341                node.id.clone(),
1342                "default".to_string(),
1343                switch.default.clone(),
1344            ));
1345        }
1346    }
1347
1348    let mut edges = emitted.into_iter().collect::<Vec<_>>();
1349    edges.sort();
1350
1351    for (from, label, to) in edges {
1352        if label.is_empty() {
1353            lines.push(format!(
1354                "  {} --> {}",
1355                sanitize_mermaid_id(&from),
1356                sanitize_mermaid_id(&to)
1357            ));
1358        } else {
1359            lines.push(format!(
1360                "  {} -- \"{}\" --> {}",
1361                sanitize_mermaid_id(&from),
1362                escape_mermaid_label(&label),
1363                sanitize_mermaid_id(&to)
1364            ));
1365        }
1366    }
1367
1368    if !tool_node_ids.is_empty() {
1369        lines.push("  classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1370        lines.push(format!("  class {} toolNode;", tool_node_ids.join(",")));
1371    }
1372
1373    lines.join("\n")
1374}
1375
1376fn llm_tool_names(llm: &YamlLlmCall) -> Vec<String> {
1377    llm.tools
1378        .iter()
1379        .map(|tool| match tool {
1380            YamlToolDeclaration::OpenAi(openai) => openai.function.name.clone(),
1381            YamlToolDeclaration::Simplified(simple) => simple.name.clone(),
1382        })
1383        .collect()
1384}
1385
1386/// Load a YAML workflow file and render it as Mermaid flowchart.
1387pub fn yaml_workflow_file_to_mermaid(workflow_path: &Path) -> Result<String, YamlWorkflowRunError> {
1388    let (canonical_path, workflow) = load_workflow_yaml_file(workflow_path)?;
1389
1390    let referenced_subgraphs = discover_referenced_subgraphs(canonical_path.as_path(), &workflow)?;
1391    if referenced_subgraphs.is_empty() {
1392        return Ok(yaml_workflow_to_mermaid(&workflow));
1393    }
1394
1395    Ok(yaml_workflow_to_mermaid_with_subgraphs(
1396        &workflow,
1397        &referenced_subgraphs,
1398    ))
1399}
1400
1401#[derive(Debug, Clone)]
1402struct MermaidSubgraphWorkflow {
1403    alias: String,
1404    workflow: YamlWorkflow,
1405}
1406
1407#[derive(Debug, Default)]
1408struct MermaidBlockRender {
1409    lines: Vec<String>,
1410    tool_node_ids: Vec<String>,
1411    run_workflow_tool_node_ids: Vec<String>,
1412    entry_node_id: String,
1413}
1414
1415fn yaml_workflow_to_mermaid_with_subgraphs(
1416    workflow: &YamlWorkflow,
1417    subgraphs: &[MermaidSubgraphWorkflow],
1418) -> String {
1419    let mut lines = vec!["flowchart TD".to_string()];
1420
1421    let main_block = render_mermaid_block(workflow, "main");
1422    lines.push(format!(
1423        "  subgraph main_graph[\"Main: {}\"]",
1424        escape_mermaid_label(&workflow.id)
1425    ));
1426    for line in &main_block.lines {
1427        lines.push(format!("    {line}"));
1428    }
1429    lines.push("  end".to_string());
1430
1431    let mut all_tool_nodes = main_block.tool_node_ids.clone();
1432
1433    for (index, subgraph) in subgraphs.iter().enumerate() {
1434        let block_id = format!("subgraph_{}", index + 1);
1435        let block = render_mermaid_block(&subgraph.workflow, &block_id);
1436        lines.push(format!(
1437            "  subgraph {}[\"Subgraph: {}\"]",
1438            sanitize_mermaid_id(&format!("{}_cluster", block_id)),
1439            escape_mermaid_label(&subgraph.alias)
1440        ));
1441        for line in &block.lines {
1442            lines.push(format!("    {line}"));
1443        }
1444        lines.push("  end".to_string());
1445
1446        for tool_node in &main_block.run_workflow_tool_node_ids {
1447            lines.push(format!(
1448                "  {} -. \"{}\" .-> {}",
1449                tool_node,
1450                escape_mermaid_label(&format!("calls {}", subgraph.alias)),
1451                block.entry_node_id
1452            ));
1453        }
1454
1455        all_tool_nodes.extend(block.tool_node_ids);
1456    }
1457
1458    if !all_tool_nodes.is_empty() {
1459        lines.push("  classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1460        lines.push(format!("  class {} toolNode;", all_tool_nodes.join(",")));
1461    }
1462
1463    lines.join("\n")
1464}
1465
1466fn render_mermaid_block(workflow: &YamlWorkflow, prefix: &str) -> MermaidBlockRender {
1467    let mut block = MermaidBlockRender {
1468        entry_node_id: prefixed_mermaid_id(prefix, &workflow.entry_node),
1469        ..Default::default()
1470    };
1471
1472    for node in &workflow.nodes {
1473        let node_id = prefixed_mermaid_id(prefix, &node.id);
1474        let label = format!("{}\\n({})", node.id, node.kind_name());
1475        block
1476            .lines
1477            .push(format!("{}[\"{}\"]", node_id, escape_mermaid_label(&label)));
1478
1479        if let Some(llm) = node.node_type.llm_call.as_ref() {
1480            for (idx, tool) in llm.tools.iter().enumerate() {
1481                let tool_name = tool_declaration_name(tool);
1482                let tool_id = prefixed_mermaid_id(prefix, &format!("{}__tool_{}", node.id, idx));
1483                block.lines.push(format!(
1484                    "{}([\"{}\"])",
1485                    tool_id,
1486                    escape_mermaid_label(format!("tool: {tool_name}").as_str())
1487                ));
1488                block.lines.push(format!("{} -.-> {}", node_id, tool_id));
1489                block.tool_node_ids.push(tool_id.clone());
1490
1491                if tool_name == "run_workflow_graph" {
1492                    block.run_workflow_tool_node_ids.push(tool_id);
1493                }
1494            }
1495        }
1496    }
1497
1498    let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1499
1500    for edge in &workflow.edges {
1501        emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1502    }
1503
1504    for node in &workflow.nodes {
1505        if let Some(switch) = node.node_type.switch.as_ref() {
1506            for branch in &switch.branches {
1507                emitted.insert((
1508                    node.id.clone(),
1509                    branch.condition.clone(),
1510                    branch.target.clone(),
1511                ));
1512            }
1513            emitted.insert((
1514                node.id.clone(),
1515                "default".to_string(),
1516                switch.default.clone(),
1517            ));
1518        }
1519    }
1520
1521    let mut edges = emitted.into_iter().collect::<Vec<_>>();
1522    edges.sort();
1523
1524    for (from, label, to) in edges {
1525        let from_id = prefixed_mermaid_id(prefix, &from);
1526        let to_id = prefixed_mermaid_id(prefix, &to);
1527        if label.is_empty() {
1528            block.lines.push(format!("{} --> {}", from_id, to_id));
1529        } else {
1530            block.lines.push(format!(
1531                "{} -- \"{}\" --> {}",
1532                from_id,
1533                escape_mermaid_label(&label),
1534                to_id
1535            ));
1536        }
1537    }
1538
1539    block
1540}
1541
1542fn discover_referenced_subgraphs(
1543    workflow_path: &Path,
1544    workflow: &YamlWorkflow,
1545) -> Result<Vec<MermaidSubgraphWorkflow>, YamlWorkflowRunError> {
1546    let workflow_ids = referenced_workflow_ids(workflow);
1547    if workflow_ids.is_empty() {
1548        return Ok(Vec::new());
1549    }
1550
1551    let parent_dir = workflow_path.parent().unwrap_or(Path::new("."));
1552    let sibling_workflows = load_yaml_sibling_workflows(parent_dir, workflow_path)?;
1553
1554    let mut discovered = Vec::new();
1555    let mut seen = HashSet::new();
1556
1557    for workflow_id in workflow_ids {
1558        let normalized = normalize_workflow_lookup_key(&workflow_id);
1559        if seen.contains(&normalized) {
1560            continue;
1561        }
1562
1563        if let Some((_, subworkflow)) = sibling_workflows.iter().find(|(key, _)| key == &normalized)
1564        {
1565            discovered.push(MermaidSubgraphWorkflow {
1566                alias: workflow_id.clone(),
1567                workflow: subworkflow.clone(),
1568            });
1569            seen.insert(normalized);
1570        }
1571    }
1572
1573    Ok(discovered)
1574}
1575
1576fn load_yaml_sibling_workflows(
1577    parent_dir: &Path,
1578    workflow_path: &Path,
1579) -> Result<Vec<(String, YamlWorkflow)>, YamlWorkflowRunError> {
1580    let mut results = Vec::new();
1581    let canonical_target =
1582        std::fs::canonicalize(workflow_path).unwrap_or_else(|_| workflow_path.to_path_buf());
1583    let entries = std::fs::read_dir(parent_dir).map_err(|source| YamlWorkflowRunError::Read {
1584        path: parent_dir.display().to_string(),
1585        source,
1586    })?;
1587
1588    for entry in entries {
1589        let entry = entry.map_err(|source| YamlWorkflowRunError::Read {
1590            path: parent_dir.display().to_string(),
1591            source,
1592        })?;
1593        let path = entry.path();
1594        if !is_yaml_file(&path) {
1595            continue;
1596        }
1597
1598        let canonical_path = std::fs::canonicalize(&path).unwrap_or(path.clone());
1599        if canonical_path == canonical_target {
1600            continue;
1601        }
1602
1603        let (_, subworkflow) = load_workflow_yaml_file(path.as_path())?;
1604
1605        if let Some(stem) = path.file_stem().and_then(|value| value.to_str()) {
1606            results.push((normalize_workflow_lookup_key(stem), subworkflow.clone()));
1607        }
1608        results.push((normalize_workflow_lookup_key(&subworkflow.id), subworkflow));
1609    }
1610
1611    Ok(results)
1612}
1613
1614fn referenced_workflow_ids(workflow: &YamlWorkflow) -> Vec<String> {
1615    let mut ids = Vec::new();
1616    let mut seen = HashSet::new();
1617
1618    for node in &workflow.nodes {
1619        let prompt = node
1620            .config
1621            .as_ref()
1622            .and_then(|config| config.prompt.as_deref());
1623
1624        if let Some(llm) = node.node_type.llm_call.as_ref() {
1625            for tool in &llm.tools {
1626                if tool_declaration_name(tool) != "run_workflow_graph" {
1627                    continue;
1628                }
1629
1630                for workflow_id in referenced_workflow_ids_from_tool(tool) {
1631                    let normalized = normalize_workflow_lookup_key(&workflow_id);
1632                    if seen.insert(normalized) {
1633                        ids.push(workflow_id);
1634                    }
1635                }
1636
1637                if let Some(prompt_text) = prompt {
1638                    for workflow_id in referenced_workflow_ids_from_prompt(prompt_text) {
1639                        let normalized = normalize_workflow_lookup_key(&workflow_id);
1640                        if seen.insert(normalized) {
1641                            ids.push(workflow_id);
1642                        }
1643                    }
1644                }
1645            }
1646        }
1647    }
1648
1649    ids
1650}
1651
1652fn referenced_workflow_ids_from_tool(tool: &YamlToolDeclaration) -> Vec<String> {
1653    let schema = match tool {
1654        YamlToolDeclaration::OpenAi(openai) => openai.function.parameters.as_ref(),
1655        YamlToolDeclaration::Simplified(simple) => Some(&simple.input_schema),
1656    };
1657
1658    let mut ids = Vec::new();
1659    if let Some(schema) = schema {
1660        if let Some(workflow_prop) = schema
1661            .get("properties")
1662            .and_then(Value::as_object)
1663            .and_then(|properties| properties.get("workflow_id"))
1664        {
1665            if let Some(value) = workflow_prop.get("const").and_then(Value::as_str) {
1666                ids.push(value.to_string());
1667            }
1668
1669            if let Some(enum_values) = workflow_prop.get("enum").and_then(Value::as_array) {
1670                for value in enum_values {
1671                    if let Some(value) = value.as_str() {
1672                        ids.push(value.to_string());
1673                    }
1674                }
1675            }
1676        }
1677    }
1678
1679    ids
1680}
1681
1682fn referenced_workflow_ids_from_prompt(prompt: &str) -> Vec<String> {
1683    let mut ids = Vec::new();
1684    let mut search = prompt;
1685
1686    while let Some(index) = search.find("\"workflow_id\"") {
1687        let remainder = &search[index + "\"workflow_id\"".len()..];
1688        let Some(colon_index) = remainder.find(':') else {
1689            break;
1690        };
1691
1692        let candidate = remainder[colon_index + 1..].trim_start();
1693        if let Some(rest) = candidate.strip_prefix('"') {
1694            if let Some(end_quote_index) = rest.find('"') {
1695                let workflow_id = rest[..end_quote_index].trim();
1696                if !workflow_id.is_empty() {
1697                    ids.push(workflow_id.to_string());
1698                }
1699                search = &rest[end_quote_index + 1..];
1700                continue;
1701            }
1702        }
1703
1704        search = &remainder[colon_index + 1..];
1705    }
1706
1707    ids
1708}
1709
1710fn normalize_workflow_lookup_key(value: &str) -> String {
1711    value
1712        .chars()
1713        .map(|ch| match ch {
1714            '-' => '_',
1715            _ => ch.to_ascii_lowercase(),
1716        })
1717        .collect()
1718}
1719
1720fn is_yaml_file(path: &Path) -> bool {
1721    matches!(
1722        path.extension().and_then(|ext| ext.to_str()),
1723        Some("yaml") | Some("yml")
1724    )
1725}
1726
1727fn yaml_value_depth(value: &serde_yaml::Value, depth: usize) -> usize {
1728    match value {
1729        serde_yaml::Value::Sequence(items) => items
1730            .iter()
1731            .map(|item| yaml_value_depth(item, depth + 1))
1732            .max()
1733            .unwrap_or(depth),
1734        serde_yaml::Value::Mapping(map) => map
1735            .values()
1736            .map(|item| yaml_value_depth(item, depth + 1))
1737            .max()
1738            .unwrap_or(depth),
1739        _ => depth,
1740    }
1741}
1742
1743fn load_workflow_yaml_file(
1744    workflow_path: &Path,
1745) -> Result<(PathBuf, YamlWorkflow), YamlWorkflowRunError> {
1746    let canonical_path =
1747        std::fs::canonicalize(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1748            path: workflow_path.display().to_string(),
1749            source,
1750        })?;
1751
1752    let metadata =
1753        std::fs::metadata(&canonical_path).map_err(|source| YamlWorkflowRunError::Read {
1754            path: canonical_path.display().to_string(),
1755            source,
1756        })?;
1757
1758    if !metadata.is_file() {
1759        return Err(YamlWorkflowRunError::FileRejected {
1760            path: canonical_path.display().to_string(),
1761            reason: "path must reference a regular file".to_string(),
1762        });
1763    }
1764
1765    if !is_yaml_file(&canonical_path) {
1766        return Err(YamlWorkflowRunError::FileRejected {
1767            path: canonical_path.display().to_string(),
1768            reason: "workflow file extension must be .yaml or .yml".to_string(),
1769        });
1770    }
1771
1772    if metadata.len() > MAX_WORKFLOW_YAML_BYTES {
1773        return Err(YamlWorkflowRunError::FileRejected {
1774            path: canonical_path.display().to_string(),
1775            reason: format!(
1776                "workflow yaml is too large ({} bytes > {} bytes)",
1777                metadata.len(),
1778                MAX_WORKFLOW_YAML_BYTES
1779            ),
1780        });
1781    }
1782
1783    let contents =
1784        std::fs::read_to_string(&canonical_path).map_err(|source| YamlWorkflowRunError::Read {
1785            path: canonical_path.display().to_string(),
1786            source,
1787        })?;
1788
1789    let yaml_value: serde_yaml::Value =
1790        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1791            path: canonical_path.display().to_string(),
1792            source,
1793        })?;
1794
1795    let depth = yaml_value_depth(&yaml_value, 1);
1796    if depth > MAX_WORKFLOW_YAML_DEPTH {
1797        return Err(YamlWorkflowRunError::FileRejected {
1798            path: canonical_path.display().to_string(),
1799            reason: format!(
1800                "workflow yaml nesting depth {} exceeds limit {}",
1801                depth, MAX_WORKFLOW_YAML_DEPTH
1802            ),
1803        });
1804    }
1805
1806    let workflow: YamlWorkflow =
1807        serde_yaml::from_value(yaml_value).map_err(|source| YamlWorkflowRunError::Parse {
1808            path: canonical_path.display().to_string(),
1809            source,
1810        })?;
1811
1812    Ok((canonical_path, workflow))
1813}
1814
1815fn prefixed_mermaid_id(prefix: &str, id: &str) -> String {
1816    sanitize_mermaid_id(format!("{}__{}", prefix, id).as_str())
1817}
1818
1819fn tool_declaration_name(tool: &YamlToolDeclaration) -> &str {
1820    match tool {
1821        YamlToolDeclaration::OpenAi(openai) => &openai.function.name,
1822        YamlToolDeclaration::Simplified(simple) => &simple.name,
1823    }
1824}
1825
1826pub fn yaml_workflow_to_ir(workflow: &YamlWorkflow) -> Result<WorkflowDefinition, YamlToIrError> {
1827    let known_ids: HashSet<&str> = workflow.nodes.iter().map(|n| n.id.as_str()).collect();
1828    if !known_ids.contains(workflow.entry_node.as_str()) {
1829        return Err(YamlToIrError::MissingEntry {
1830            entry_node: workflow.entry_node.clone(),
1831        });
1832    }
1833
1834    let mut outgoing: HashMap<&str, Vec<&str>> = HashMap::new();
1835    for edge in &workflow.edges {
1836        outgoing
1837            .entry(edge.from.as_str())
1838            .or_default()
1839            .push(edge.to.as_str());
1840    }
1841
1842    let mut nodes = Vec::with_capacity(workflow.nodes.len() + 1);
1843    nodes.push(Node {
1844        id: YAML_START_NODE_ID.to_string(),
1845        kind: NodeKind::Start {
1846            next: workflow.entry_node.clone(),
1847        },
1848    });
1849
1850    for node in &workflow.nodes {
1851        if let Some(llm) = node.node_type.llm_call.as_ref() {
1852            if node
1853                .config
1854                .as_ref()
1855                .and_then(|c| c.set_globals.as_ref())
1856                .is_some()
1857                || node
1858                    .config
1859                    .as_ref()
1860                    .and_then(|c| c.update_globals.as_ref())
1861                    .is_some()
1862            {
1863                return Err(YamlToIrError::UnsupportedNode {
1864                    node_id: node.id.clone(),
1865                    reason: "set_globals/update_globals are not represented in canonical IR llm nodes yet"
1866                        .to_string(),
1867                });
1868            }
1869
1870            if !llm.tools.is_empty() {
1871                return Err(YamlToIrError::UnsupportedNode {
1872                    node_id: node.id.clone(),
1873                    reason: "llm_call.tools are not represented in canonical IR llm nodes yet"
1874                        .to_string(),
1875                });
1876            }
1877
1878            let next = single_next_for_node(&outgoing, &node.id)?;
1879            nodes.push(Node {
1880                id: node.id.clone(),
1881                kind: NodeKind::Tool {
1882                    tool: YAML_LLM_TOOL_ID.to_string(),
1883                    input: json!({
1884                        "node_id": node.id,
1885                        "model": llm.model,
1886                        "prompt_template": node
1887                            .config
1888                            .as_ref()
1889                            .and_then(|c| c.prompt.clone())
1890                            .unwrap_or_default(),
1891                        "stream": llm.stream.unwrap_or(false),
1892                        "stream_json_as_text": llm.stream_json_as_text.unwrap_or(false),
1893                        "heal": llm.heal.unwrap_or(false),
1894                        "messages_path": llm.messages_path,
1895                        "append_prompt_as_user": llm.append_prompt_as_user.unwrap_or(true),
1896                        "output_schema": node
1897                            .config
1898                            .as_ref()
1899                            .and_then(|c| c.output_schema.clone())
1900                            .unwrap_or_else(default_llm_output_schema),
1901                    }),
1902                    next,
1903                },
1904            });
1905            continue;
1906        }
1907
1908        if let Some(worker) = node.node_type.custom_worker.as_ref() {
1909            if node
1910                .config
1911                .as_ref()
1912                .and_then(|c| c.set_globals.as_ref())
1913                .is_some()
1914                || node
1915                    .config
1916                    .as_ref()
1917                    .and_then(|c| c.update_globals.as_ref())
1918                    .is_some()
1919            {
1920                return Err(YamlToIrError::UnsupportedNode {
1921                    node_id: node.id.clone(),
1922                    reason: "set_globals/update_globals are not represented in canonical IR tool nodes yet"
1923                        .to_string(),
1924                });
1925            }
1926
1927            let next = single_next_for_node(&outgoing, &node.id)?;
1928            nodes.push(Node {
1929                id: node.id.clone(),
1930                kind: NodeKind::Tool {
1931                    tool: worker.handler.clone(),
1932                    input: node
1933                        .config
1934                        .as_ref()
1935                        .and_then(|c| c.payload.clone())
1936                        .unwrap_or_else(|| json!({})),
1937                    next,
1938                },
1939            });
1940            continue;
1941        }
1942
1943        if let Some(switch) = node.node_type.switch.as_ref() {
1944            nodes.push(Node {
1945                id: node.id.clone(),
1946                kind: NodeKind::Router {
1947                    routes: switch
1948                        .branches
1949                        .iter()
1950                        .map(|b| RouterRoute {
1951                            when: rewrite_yaml_condition_to_ir(&b.condition),
1952                            next: b.target.clone(),
1953                        })
1954                        .collect(),
1955                    default: switch.default.clone(),
1956                },
1957            });
1958            continue;
1959        }
1960
1961        return Err(YamlToIrError::UnsupportedNode {
1962            node_id: node.id.clone(),
1963            reason: "node_type must be llm_call, switch, or custom_worker".to_string(),
1964        });
1965    }
1966
1967    Ok(WorkflowDefinition {
1968        version: WORKFLOW_IR_V0.to_string(),
1969        name: workflow.id.clone(),
1970        nodes,
1971    })
1972}
1973
1974fn single_next_for_node(
1975    outgoing: &HashMap<&str, Vec<&str>>,
1976    node_id: &str,
1977) -> Result<Option<String>, YamlToIrError> {
1978    match outgoing.get(node_id) {
1979        None => Ok(None),
1980        Some(targets) if targets.len() == 1 => Ok(Some(targets[0].to_string())),
1981        Some(_) => Err(YamlToIrError::MultipleOutgoingEdge {
1982            node_id: node_id.to_string(),
1983        }),
1984    }
1985}
1986
1987fn rewrite_yaml_condition_to_ir(expr: &str) -> String {
1988    let rewritten = expr
1989        .replace("$.nodes.", "$.node_outputs.")
1990        .replace(".output.", ".");
1991    if let Some(prefix) = rewritten.strip_suffix(".output") {
1992        prefix.to_string()
1993    } else {
1994        rewritten
1995    }
1996}
1997
1998fn sanitize_mermaid_id(id: &str) -> String {
1999    let mut out = String::with_capacity(id.len() + 1);
2000    if id
2001        .chars()
2002        .next()
2003        .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_')
2004    {
2005        out.push_str(id);
2006    } else {
2007        out.push('n');
2008        out.push('_');
2009        out.push_str(id);
2010    }
2011    out.chars()
2012        .map(|ch| {
2013            if ch.is_ascii_alphanumeric() || ch == '_' {
2014                ch
2015            } else {
2016                '_'
2017            }
2018        })
2019        .collect()
2020}
2021
2022fn escape_mermaid_label(label: &str) -> String {
2023    label.replace('"', "\\\"")
2024}
2025
2026#[derive(Debug, Clone)]
2027pub struct YamlLlmExecutionRequest {
2028    pub node_id: String,
2029    pub is_terminal_node: bool,
2030    pub stream_json_as_text: bool,
2031    pub model: String,
2032    pub messages: Option<Vec<Message>>,
2033    pub append_prompt_as_user: bool,
2034    pub prompt: String,
2035    pub prompt_template: String,
2036    pub prompt_bindings: Vec<YamlTemplateBinding>,
2037    pub schema: Value,
2038    pub stream: bool,
2039    pub heal: bool,
2040    pub tools: Vec<YamlResolvedTool>,
2041    pub tool_choice: Option<ToolChoice>,
2042    pub max_tool_roundtrips: u8,
2043    pub tool_calls_global_key: Option<String>,
2044    pub tool_trace_mode: YamlToolTraceMode,
2045    pub execution_context: Value,
2046    pub email_text: String,
2047    pub trace_id: Option<String>,
2048    pub trace_context: Option<TraceContext>,
2049    pub tenant_context: YamlWorkflowTraceTenantContext,
2050    pub trace_sampled: bool,
2051}
2052
2053#[derive(Debug, Clone)]
2054pub struct YamlResolvedTool {
2055    pub definition: ToolDefinition,
2056    pub output_schema: Option<Value>,
2057}
2058
2059#[async_trait]
2060pub trait YamlWorkflowLlmExecutor: Send + Sync {
2061    async fn complete_structured(
2062        &self,
2063        request: YamlLlmExecutionRequest,
2064        event_sink: Option<&dyn YamlWorkflowEventSink>,
2065    ) -> Result<YamlLlmExecutionResult, String>;
2066}
2067
2068#[async_trait]
2069pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
2070    async fn execute(
2071        &self,
2072        handler: &str,
2073        payload: &Value,
2074        email_text: &str,
2075        context: &Value,
2076    ) -> Result<Value, String>;
2077}
2078
2079pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2080    workflow: &YamlWorkflow,
2081    workflow_input: &Value,
2082    client: &SimpleAgentsClient,
2083    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2084    event_sink: Option<&dyn YamlWorkflowEventSink>,
2085    options: &YamlWorkflowRunOptions,
2086) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2087    let executor = BorrowedClientExecutor {
2088        client,
2089        custom_worker,
2090        run_options: options.clone(),
2091    };
2092    run_workflow_yaml_with_custom_worker_and_events_and_options(
2093        workflow,
2094        workflow_input,
2095        &executor,
2096        custom_worker,
2097        event_sink,
2098        options,
2099    )
2100    .await
2101}
2102
2103pub async fn run_workflow_yaml_with_custom_worker_and_events_and_options(
2104    workflow: &YamlWorkflow,
2105    workflow_input: &Value,
2106    executor: &dyn YamlWorkflowLlmExecutor,
2107    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2108    event_sink: Option<&dyn YamlWorkflowEventSink>,
2109    options: &YamlWorkflowRunOptions,
2110) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2111    execute::run_workflow_yaml_with_custom_worker_and_events_and_options_impl(
2112        workflow,
2113        workflow_input,
2114        executor,
2115        custom_worker,
2116        event_sink,
2117        options,
2118    )
2119    .await
2120}
2121
2122async fn try_run_yaml_via_ir_runtime(
2123    workflow: &YamlWorkflow,
2124    workflow_input: &Value,
2125    executor: &dyn YamlWorkflowLlmExecutor,
2126    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2127    options: &YamlWorkflowRunOptions,
2128) -> Result<Option<YamlWorkflowRunOutput>, YamlWorkflowRunError> {
2129    let ir = match yaml_workflow_to_ir(workflow) {
2130        Ok(def) => def,
2131        Err(YamlToIrError::UnsupportedNode { .. })
2132        | Err(YamlToIrError::MultipleOutgoingEdge { .. }) => return Ok(None),
2133        Err(err) => {
2134            return Err(YamlWorkflowRunError::InvalidInput {
2135                message: err.to_string(),
2136            });
2137        }
2138    };
2139
2140    if validate_and_normalize(&ir).is_err() {
2141        return Ok(None);
2142    }
2143
2144    let parent_trace_context = trace_context_from_options(options);
2145    let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
2146
2147    let tracer = workflow_tracer();
2148    let mut workflow_span_context: Option<TraceContext> = None;
2149    let mut workflow_span = if telemetry_context.sampled {
2150        let (span_context, mut span) = tracer.start_span(
2151            "workflow.run",
2152            SpanKind::Workflow,
2153            parent_trace_context.as_ref(),
2154        );
2155        apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
2156        apply_trace_tenant_attributes(span.as_mut(), options);
2157        workflow_span_context = Some(span_context);
2158        Some(span)
2159    } else {
2160        None
2161    };
2162
2163    struct NoopLlm;
2164    #[async_trait]
2165    impl LlmExecutor for NoopLlm {
2166        async fn execute(
2167            &self,
2168            _input: LlmExecutionInput,
2169        ) -> Result<LlmExecutionOutput, LlmExecutionError> {
2170            Err(LlmExecutionError::UnexpectedOutcome(
2171                "yaml_ir_uses_tool_path",
2172            ))
2173        }
2174    }
2175
2176    struct YamlIrToolExecutor<'a> {
2177        llm_executor: &'a dyn YamlWorkflowLlmExecutor,
2178        custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
2179        token_totals: std::sync::Mutex<YamlTokenTotals>,
2180        node_usage: std::sync::Mutex<BTreeMap<String, YamlLlmTokenUsage>>,
2181        node_models: std::sync::Mutex<BTreeMap<String, String>>,
2182        model_override: Option<String>,
2183        trace_id: Option<String>,
2184        trace_context: Option<TraceContext>,
2185        trace_input_context: Option<YamlWorkflowTraceContextInput>,
2186        tenant_context: YamlWorkflowTraceTenantContext,
2187        payload_mode: YamlWorkflowPayloadMode,
2188        trace_sampled: bool,
2189    }
2190
2191    #[async_trait]
2192    impl ToolExecutor for YamlIrToolExecutor<'_> {
2193        async fn execute_tool(
2194            &self,
2195            input: ToolExecutionInput,
2196        ) -> Result<Value, ToolExecutionError> {
2197            let context = build_yaml_context_from_ir_scope(&input.scoped_input);
2198
2199            if input.tool == YAML_LLM_TOOL_ID {
2200                let node_id = input
2201                    .input
2202                    .get("node_id")
2203                    .and_then(Value::as_str)
2204                    .ok_or_else(|| {
2205                        ToolExecutionError::Failed("yaml llm call missing node_id".to_string())
2206                    })?
2207                    .to_string();
2208                let node_id_for_metrics = node_id.clone();
2209                let model = input
2210                    .input
2211                    .get("model")
2212                    .and_then(Value::as_str)
2213                    .ok_or_else(|| {
2214                        ToolExecutionError::Failed("yaml llm call missing model".to_string())
2215                    })?
2216                    .to_string();
2217                let resolved_model =
2218                    resolve_requested_model(self.model_override.as_deref(), &model);
2219                let prompt_template = input
2220                    .input
2221                    .get("prompt_template")
2222                    .and_then(Value::as_str)
2223                    .unwrap_or_default()
2224                    .to_string();
2225                let stream = input
2226                    .input
2227                    .get("stream")
2228                    .and_then(Value::as_bool)
2229                    .unwrap_or(false);
2230                let heal = input
2231                    .input
2232                    .get("heal")
2233                    .and_then(Value::as_bool)
2234                    .unwrap_or(false);
2235                let append_prompt_as_user = input
2236                    .input
2237                    .get("append_prompt_as_user")
2238                    .and_then(Value::as_bool)
2239                    .unwrap_or(true);
2240                let messages_path = input
2241                    .input
2242                    .get("messages_path")
2243                    .and_then(Value::as_str)
2244                    .map(str::to_string);
2245
2246                let messages = if let Some(path) = messages_path.as_deref() {
2247                    Some(
2248                        parse_messages_from_context(path, &context)
2249                            .map_err(ToolExecutionError::Failed)?,
2250                    )
2251                } else {
2252                    None
2253                };
2254
2255                let prompt_bindings = collect_template_bindings(&prompt_template, &context);
2256                let prompt = interpolate_template(&prompt_template, &context);
2257                let email_text = context
2258                    .get("input")
2259                    .and_then(|v| v.get("email_text"))
2260                    .and_then(Value::as_str)
2261                    .unwrap_or_default();
2262                let schema = input
2263                    .input
2264                    .get("output_schema")
2265                    .cloned()
2266                    .unwrap_or_else(default_llm_output_schema);
2267
2268                let request = YamlLlmExecutionRequest {
2269                    node_id,
2270                    is_terminal_node: false,
2271                    stream_json_as_text: input
2272                        .input
2273                        .get("stream_json_as_text")
2274                        .and_then(Value::as_bool)
2275                        .unwrap_or(false),
2276                    model: resolved_model.clone(),
2277                    messages,
2278                    append_prompt_as_user,
2279                    prompt,
2280                    prompt_template,
2281                    prompt_bindings,
2282                    schema,
2283                    stream,
2284                    heal,
2285                    tools: Vec::new(),
2286                    tool_choice: None,
2287                    max_tool_roundtrips: 1,
2288                    tool_calls_global_key: None,
2289                    tool_trace_mode: YamlToolTraceMode::Off,
2290                    execution_context: context.clone(),
2291                    email_text: email_text.to_string(),
2292                    trace_id: self.trace_id.clone(),
2293                    trace_context: self.trace_context.clone(),
2294                    tenant_context: self.tenant_context.clone(),
2295                    trace_sampled: self.trace_sampled,
2296                };
2297
2298                let llm_result = self
2299                    .llm_executor
2300                    .complete_structured(request, None)
2301                    .await
2302                    .map_err(ToolExecutionError::Failed);
2303
2304                if let Ok(ref result) = llm_result {
2305                    if let Some(usage) = result.usage.as_ref() {
2306                        if let Ok(mut totals) = self.token_totals.lock() {
2307                            totals.add_usage(usage);
2308                        }
2309                        if let Ok(mut usage_map) = self.node_usage.lock() {
2310                            usage_map.insert(node_id_for_metrics.clone(), usage.clone());
2311                        }
2312                    }
2313                    if let Ok(mut model_map) = self.node_models.lock() {
2314                        model_map.insert(node_id_for_metrics, resolved_model);
2315                    }
2316                }
2317
2318                return llm_result.map(|result| result.payload);
2319            }
2320
2321            let worker = self
2322                .custom_worker
2323                .ok_or_else(|| ToolExecutionError::NotFound {
2324                    tool: input.tool.clone(),
2325                })?;
2326
2327            let payload = input.input.clone();
2328            let email_text = context
2329                .get("input")
2330                .and_then(|v| v.get("email_text"))
2331                .and_then(Value::as_str)
2332                .unwrap_or_default();
2333
2334            let tracer = workflow_tracer();
2335            let mut handler_span_context: Option<TraceContext> = None;
2336            let mut handler_span = if self.trace_sampled {
2337                let (span_context, mut span) = tracer.start_span(
2338                    "handler.invoke",
2339                    SpanKind::Node,
2340                    self.trace_context.as_ref(),
2341                );
2342                handler_span_context = Some(span_context);
2343                apply_trace_identity_attributes(span.as_mut(), self.trace_id.as_deref());
2344                span.set_attribute("handler_name", input.tool.as_str());
2345                apply_trace_tenant_attributes_from_tenant(span.as_mut(), &self.tenant_context);
2346                span.set_attribute(
2347                    "node_input",
2348                    payload_for_span(self.payload_mode, &payload).as_str(),
2349                );
2350                Some(span)
2351            } else {
2352                None
2353            };
2354
2355            let trace_options = YamlWorkflowRunOptions {
2356                telemetry: YamlWorkflowTelemetryConfig::default(),
2357                trace: YamlWorkflowTraceOptions {
2358                    context: self.trace_input_context.clone(),
2359                    tenant: self.tenant_context.clone(),
2360                },
2361                model: None,
2362            };
2363            let worker_trace_context = merged_trace_context_for_worker(
2364                handler_span_context.as_ref(),
2365                self.trace_id.as_deref(),
2366                &trace_options,
2367            );
2368            let worker_context = custom_worker_context_with_trace(
2369                &context,
2370                &worker_trace_context,
2371                &self.tenant_context,
2372            );
2373
2374            let output_result = worker
2375                .execute(&input.tool, &payload, email_text, &worker_context)
2376                .await
2377                .map_err(ToolExecutionError::Failed);
2378
2379            if let Some(span) = handler_span.as_mut() {
2380                if output_result.is_ok() {
2381                    span.add_event("handler.success");
2382                } else {
2383                    span.add_event("handler.error");
2384                }
2385            }
2386
2387            if let Some(span) = handler_span.take() {
2388                span.end();
2389            }
2390
2391            output_result
2392        }
2393    }
2394
2395    let tool_executor = YamlIrToolExecutor {
2396        llm_executor: executor,
2397        custom_worker,
2398        token_totals: std::sync::Mutex::new(YamlTokenTotals::default()),
2399        node_usage: std::sync::Mutex::new(BTreeMap::new()),
2400        node_models: std::sync::Mutex::new(BTreeMap::new()),
2401        model_override: options.model.clone(),
2402        trace_id: telemetry_context.trace_id.clone(),
2403        trace_context: workflow_span_context.clone(),
2404        trace_input_context: options.trace.context.clone(),
2405        tenant_context: options.trace.tenant.clone(),
2406        payload_mode: options.telemetry.payload_mode,
2407        trace_sampled: telemetry_context.sampled,
2408    };
2409
2410    let runtime_options = WorkflowRuntimeOptions {
2411        validate_before_run: false,
2412        ..WorkflowRuntimeOptions::default()
2413    };
2414    let runtime = WorkflowRuntime::new(ir, &NoopLlm, Some(&tool_executor), runtime_options);
2415
2416    let started = Instant::now();
2417    let result = match runtime.execute(workflow_input.clone(), None).await {
2418        Ok(result) => result,
2419        Err(WorkflowRuntimeError::Validation(_)) => return Ok(None),
2420        Err(error) => {
2421            return Err(YamlWorkflowRunError::IrRuntime {
2422                message: error.to_string(),
2423            });
2424        }
2425    };
2426    let total_elapsed_ms = started.elapsed().as_millis();
2427
2428    let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
2429    for (node_id, output) in result.node_outputs {
2430        if node_id == YAML_START_NODE_ID {
2431            continue;
2432        }
2433        outputs.insert(node_id, json!({"output": output}));
2434    }
2435
2436    let mut trace = Vec::new();
2437    let mut step_timings = Vec::new();
2438    let node_usage_map = tool_executor
2439        .node_usage
2440        .lock()
2441        .map(|usage| usage.clone())
2442        .unwrap_or_default();
2443    let llm_node_models = tool_executor
2444        .node_models
2445        .lock()
2446        .map(|models| models.clone())
2447        .unwrap_or_default();
2448    let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
2449    for execution in result.node_executions {
2450        if execution.node_id == YAML_START_NODE_ID {
2451            continue;
2452        }
2453        let node_id = execution.node_id;
2454        trace.push(node_id.clone());
2455        let usage = node_usage_map.get(&node_id);
2456        if let Some(usage) = usage {
2457            llm_node_metrics.insert(
2458                node_id.clone(),
2459                YamlLlmNodeMetrics {
2460                    elapsed_ms: 0,
2461                    prompt_tokens: usage.prompt_tokens,
2462                    completion_tokens: usage.completion_tokens,
2463                    total_tokens: usage.total_tokens,
2464                    reasoning_tokens: usage.reasoning_tokens,
2465                    tokens_per_second: completion_tokens_per_second(usage.completion_tokens, 0),
2466                },
2467            );
2468        }
2469        step_timings.push(YamlStepTiming {
2470            node_id: node_id.clone(),
2471            node_kind: "ir_runtime".to_string(),
2472            model_name: llm_node_models.get(&node_id).cloned(),
2473            elapsed_ms: 0,
2474            prompt_tokens: usage.map(|value| value.prompt_tokens),
2475            completion_tokens: usage.map(|value| value.completion_tokens),
2476            total_tokens: usage.map(|value| value.total_tokens),
2477            reasoning_tokens: usage.and_then(|value| value.reasoning_tokens),
2478            tokens_per_second: usage
2479                .map(|value| completion_tokens_per_second(value.completion_tokens, 0)),
2480        });
2481    }
2482
2483    let terminal_node = result.terminal_node_id;
2484    let terminal_output = outputs
2485        .get(&terminal_node)
2486        .and_then(|v| v.get("output"))
2487        .cloned();
2488
2489    let email_text = workflow_input
2490        .get("email_text")
2491        .and_then(Value::as_str)
2492        .unwrap_or_default()
2493        .to_string();
2494
2495    let token_totals = tool_executor
2496        .token_totals
2497        .lock()
2498        .map(|totals| totals.clone())
2499        .unwrap_or_default();
2500
2501    let output = YamlWorkflowRunOutput {
2502        workflow_id: workflow.id.clone(),
2503        entry_node: workflow.entry_node.clone(),
2504        email_text,
2505        trace,
2506        outputs,
2507        terminal_node,
2508        terminal_output,
2509        step_timings,
2510        llm_node_metrics,
2511        llm_node_models,
2512        total_elapsed_ms,
2513        ttft_ms: None,
2514        total_input_tokens: token_totals.input_tokens,
2515        total_output_tokens: token_totals.output_tokens,
2516        total_tokens: token_totals.total_tokens,
2517        total_reasoning_tokens: token_totals.reasoning_tokens,
2518        tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
2519        trace_id: telemetry_context.trace_id.clone(),
2520        metadata: telemetry_context.trace_id.as_ref().map(|value| {
2521            workflow_metadata_with_trace(
2522                options,
2523                value,
2524                telemetry_context.sampled,
2525                telemetry_context.trace_id_source,
2526            )
2527        }),
2528    };
2529
2530    if let Some(mut span) = workflow_span.take() {
2531        span.set_attribute("workflow_id", workflow.id.as_str());
2532        apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
2533        apply_langfuse_trace_input_output_attributes(
2534            span.as_mut(),
2535            workflow_input,
2536            &output,
2537            options.telemetry.payload_mode,
2538        );
2539        apply_langfuse_nerdstats_attributes(span.as_mut(), &output, options.telemetry.nerdstats);
2540        span.end();
2541        flush_workflow_tracer();
2542    }
2543
2544    Ok(Some(output))
2545}
2546
2547async fn execute_subworkflow_tool_call(
2548    payload: &Value,
2549    context: &Value,
2550    client: &SimpleAgentsClient,
2551    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2552    parent_options: &YamlWorkflowRunOptions,
2553    parent_trace_context: Option<&TraceContext>,
2554    resolved_trace_id: Option<&str>,
2555) -> Result<Value, String> {
2556    let workflow_id = payload
2557        .get("workflow_id")
2558        .and_then(Value::as_str)
2559        .ok_or_else(|| "run_workflow_graph requires payload.workflow_id".to_string())?;
2560
2561    let input_context = context
2562        .get("input")
2563        .and_then(Value::as_object)
2564        .ok_or_else(|| "run_workflow_graph requires context.input".to_string())?;
2565
2566    let registry = input_context
2567        .get("workflow_registry")
2568        .and_then(Value::as_object)
2569        .ok_or_else(|| {
2570            "run_workflow_graph requires input.workflow_registry map of workflow_id -> yaml_path"
2571                .to_string()
2572        })?;
2573
2574    let workflow_path = registry
2575        .get(workflow_id)
2576        .and_then(Value::as_str)
2577        .ok_or_else(|| {
2578            format!(
2579                "workflow_registry has no entry for workflow_id '{}'",
2580                workflow_id
2581            )
2582        })?;
2583
2584    let parent_depth = input_context
2585        .get("__subgraph_depth")
2586        .and_then(Value::as_u64)
2587        .unwrap_or(0);
2588    let max_depth = input_context
2589        .get("__subgraph_max_depth")
2590        .and_then(Value::as_u64)
2591        .unwrap_or(3);
2592
2593    if parent_depth >= max_depth {
2594        return Err(format!(
2595            "run_workflow_graph depth limit reached (depth={}, max={})",
2596            parent_depth, max_depth
2597        ));
2598    }
2599
2600    let mut subworkflow_input = payload
2601        .get("input")
2602        .and_then(Value::as_object)
2603        .cloned()
2604        .unwrap_or_default();
2605
2606    if !subworkflow_input.contains_key("messages") {
2607        if let Some(messages) = input_context.get("messages") {
2608            subworkflow_input.insert("messages".to_string(), messages.clone());
2609        }
2610    }
2611
2612    if !subworkflow_input.contains_key("email_text") {
2613        if let Some(email_text) = input_context.get("email_text") {
2614            subworkflow_input.insert("email_text".to_string(), email_text.clone());
2615        }
2616    }
2617
2618    if !subworkflow_input.contains_key("workflow_registry") {
2619        subworkflow_input.insert(
2620            "workflow_registry".to_string(),
2621            Value::Object(registry.clone()),
2622        );
2623    }
2624
2625    subworkflow_input.insert(
2626        "__subgraph_depth".to_string(),
2627        Value::Number(serde_json::Number::from(parent_depth + 1)),
2628    );
2629    subworkflow_input.insert(
2630        "__subgraph_max_depth".to_string(),
2631        Value::Number(serde_json::Number::from(max_depth)),
2632    );
2633
2634    let subworkflow_options =
2635        build_subworkflow_options(parent_options, parent_trace_context, resolved_trace_id);
2636
2637    let output = run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
2638        Path::new(workflow_path),
2639        &Value::Object(subworkflow_input),
2640        client,
2641        custom_worker,
2642        None,
2643        &subworkflow_options,
2644    )
2645    .await
2646    .map_err(|error| format!("subworkflow '{}' failed: {}", workflow_id, error))?;
2647
2648    Ok(json!({
2649        "workflow_id": workflow_id,
2650        "workflow_path": workflow_path,
2651        "terminal_node": output.terminal_node,
2652        "terminal_output": output.terminal_output,
2653        "trace": output.trace,
2654    }))
2655}
2656
2657fn build_subworkflow_options(
2658    parent_options: &YamlWorkflowRunOptions,
2659    parent_trace_context: Option<&TraceContext>,
2660    resolved_trace_id: Option<&str>,
2661) -> YamlWorkflowRunOptions {
2662    let mut subworkflow_options = parent_options.clone();
2663    if parent_trace_context.is_some() || resolved_trace_id.is_some() {
2664        let trace_context = YamlWorkflowTraceContextInput {
2665            trace_id: resolved_trace_id
2666                .map(|value| value.to_string())
2667                .or_else(|| parent_trace_context.and_then(|ctx| ctx.trace_id.clone())),
2668            span_id: parent_trace_context.and_then(|ctx| ctx.span_id.clone()),
2669            parent_span_id: parent_trace_context.and_then(|ctx| ctx.parent_span_id.clone()),
2670            traceparent: parent_trace_context.and_then(|ctx| ctx.traceparent.clone()),
2671            tracestate: parent_trace_context.and_then(|ctx| ctx.tracestate.clone()),
2672            baggage: parent_trace_context
2673                .map(|ctx| ctx.baggage.clone())
2674                .unwrap_or_default(),
2675        };
2676        subworkflow_options.trace.context = Some(trace_context);
2677    }
2678    subworkflow_options
2679}
2680
2681#[derive(Debug, Clone, Deserialize)]
2682pub struct YamlWorkflow {
2683    pub id: String,
2684    pub entry_node: String,
2685    #[serde(default)]
2686    pub nodes: Vec<YamlNode>,
2687    #[serde(default)]
2688    pub edges: Vec<YamlEdge>,
2689}
2690
2691#[derive(Debug, Clone, Deserialize)]
2692pub struct YamlNode {
2693    pub id: String,
2694    pub node_type: YamlNodeType,
2695    pub config: Option<YamlNodeConfig>,
2696}
2697
2698impl YamlNode {
2699    fn kind_name(&self) -> &'static str {
2700        if self.node_type.llm_call.is_some() {
2701            "llm_call"
2702        } else if self.node_type.switch.is_some() {
2703            "switch"
2704        } else if self.node_type.custom_worker.is_some() {
2705            "custom_worker"
2706        } else {
2707            "unknown"
2708        }
2709    }
2710}
2711
2712#[derive(Debug, Clone, Deserialize)]
2713pub struct YamlNodeType {
2714    pub llm_call: Option<YamlLlmCall>,
2715    pub switch: Option<YamlSwitch>,
2716    pub custom_worker: Option<YamlCustomWorker>,
2717}
2718
2719#[derive(Debug, Clone, Deserialize)]
2720pub struct YamlLlmCall {
2721    pub model: String,
2722    pub stream: Option<bool>,
2723    pub stream_json_as_text: Option<bool>,
2724    pub heal: Option<bool>,
2725    pub messages_path: Option<String>,
2726    pub append_prompt_as_user: Option<bool>,
2727    #[serde(default)]
2728    pub tools_format: YamlToolFormat,
2729    #[serde(default)]
2730    pub tools: Vec<YamlToolDeclaration>,
2731    pub tool_choice: Option<YamlToolChoiceConfig>,
2732    pub max_tool_roundtrips: Option<u8>,
2733    pub tool_calls_global_key: Option<String>,
2734}
2735
2736#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
2737#[serde(rename_all = "snake_case")]
2738pub enum YamlToolFormat {
2739    #[default]
2740    Openai,
2741    Simplified,
2742}
2743
2744#[derive(Debug, Clone, Deserialize)]
2745#[serde(untagged)]
2746pub enum YamlToolDeclaration {
2747    OpenAi(YamlOpenAiToolDeclaration),
2748    Simplified(YamlSimplifiedToolDeclaration),
2749}
2750
2751#[derive(Debug, Clone, Deserialize)]
2752pub struct YamlOpenAiToolDeclaration {
2753    #[serde(rename = "type")]
2754    pub tool_type: Option<ToolType>,
2755    pub function: YamlOpenAiToolFunction,
2756}
2757
2758#[derive(Debug, Clone, Deserialize)]
2759pub struct YamlOpenAiToolFunction {
2760    pub name: String,
2761    pub description: Option<String>,
2762    pub parameters: Option<Value>,
2763    pub output_schema: Option<Value>,
2764}
2765
2766#[derive(Debug, Clone, Deserialize)]
2767pub struct YamlSimplifiedToolDeclaration {
2768    pub name: String,
2769    pub description: Option<String>,
2770    pub input_schema: Value,
2771    pub output_schema: Option<Value>,
2772}
2773
2774#[derive(Debug, Clone, Deserialize)]
2775#[serde(untagged)]
2776pub enum YamlToolChoiceConfig {
2777    Mode(ToolChoiceMode),
2778    Function { function: String },
2779    OpenAi(ToolChoiceTool),
2780}
2781
2782#[derive(Debug, Clone, Deserialize)]
2783pub struct YamlSwitch {
2784    #[serde(default)]
2785    pub branches: Vec<YamlSwitchBranch>,
2786    pub default: String,
2787}
2788
2789#[derive(Debug, Clone, Deserialize)]
2790pub struct YamlSwitchBranch {
2791    pub condition: String,
2792    pub target: String,
2793}
2794
2795#[derive(Debug, Clone, Deserialize)]
2796pub struct YamlCustomWorker {
2797    pub handler: String,
2798}
2799
2800#[derive(Debug, Clone, Deserialize)]
2801pub struct YamlNodeConfig {
2802    pub prompt: Option<String>,
2803    #[serde(default, alias = "schema")]
2804    pub output_schema: Option<Value>,
2805    pub payload: Option<Value>,
2806    pub set_globals: Option<HashMap<String, String>>,
2807    pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
2808}
2809
2810#[derive(Debug, Clone, Deserialize)]
2811pub struct YamlGlobalUpdate {
2812    pub op: String,
2813    pub from: Option<String>,
2814    pub by: Option<f64>,
2815}
2816
2817#[derive(Debug, Clone, Deserialize)]
2818pub struct YamlEdge {
2819    pub from: String,
2820    pub to: String,
2821}
2822
2823#[cfg(test)]
2824mod tests {
2825    use super::*;
2826    use simple_agent_type::provider::{Provider, ProviderRequest, ProviderResponse};
2827    use simple_agent_type::response::{CompletionChoice, CompletionResponse, Usage};
2828    use simple_agent_type::tool::{ToolCallFunction, ToolType};
2829    use simple_agent_type::{Result as SaResult, SimpleAgentsError};
2830    use simple_agents_core::SimpleAgentsClientBuilder;
2831    use std::collections::BTreeMap;
2832    use std::fs;
2833    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2834    use std::sync::{Arc, Mutex, OnceLock};
2835
2836    fn stream_debug_env_lock() -> &'static Mutex<()> {
2837        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
2838        LOCK.get_or_init(|| Mutex::new(()))
2839    }
2840
2841    struct MockExecutor;
2842
2843    struct RecordingSink {
2844        events: Mutex<Vec<YamlWorkflowEvent>>,
2845    }
2846
2847    struct CancelAfterFirstEventSink {
2848        cancelled: AtomicBool,
2849    }
2850
2851    impl YamlWorkflowEventSink for CancelAfterFirstEventSink {
2852        fn emit(&self, _event: &YamlWorkflowEvent) {
2853            self.cancelled.store(true, Ordering::SeqCst);
2854        }
2855
2856        fn is_cancelled(&self) -> bool {
2857            self.cancelled.load(Ordering::SeqCst)
2858        }
2859    }
2860
2861    struct CountingExecutor {
2862        call_count: AtomicUsize,
2863    }
2864
2865    struct CapturingWorker {
2866        context: Mutex<Option<Value>>,
2867    }
2868
2869    struct ToolLoopProvider;
2870
2871    struct UnknownToolProvider;
2872
2873    struct ReasoningUsageProvider;
2874
2875    struct ToolLoopReasoningProvider;
2876
2877    #[derive(Default)]
2878    struct CapturingSpan {
2879        attributes: BTreeMap<String, String>,
2880    }
2881
2882    impl WorkflowSpan for CapturingSpan {
2883        fn set_attribute(&mut self, key: &str, value: &str) {
2884            self.attributes.insert(key.to_string(), value.to_string());
2885        }
2886
2887        fn add_event(&mut self, _name: &str) {}
2888
2889        fn end(self: Box<Self>) {}
2890    }
2891
2892    #[async_trait]
2893    impl Provider for ToolLoopProvider {
2894        fn name(&self) -> &str {
2895            "openai"
2896        }
2897
2898        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
2899            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
2900            Ok(ProviderRequest::new("mock://tool-loop").with_body(body))
2901        }
2902
2903        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
2904            let request: CompletionRequest =
2905                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
2906
2907            let has_tools = request
2908                .tools
2909                .as_ref()
2910                .is_some_and(|tools| !tools.is_empty());
2911            let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
2912
2913            let response = if has_tools && !has_tool_result {
2914                CompletionResponse {
2915                    id: "resp_tool_1".to_string(),
2916                    model: request.model.clone(),
2917                    choices: vec![CompletionChoice {
2918                        index: 0,
2919                        message: Message::assistant("").with_tool_calls(vec![ToolCall {
2920                            id: "call_get_context".to_string(),
2921                            tool_type: ToolType::Function,
2922                            function: ToolCallFunction {
2923                                name: "get_customer_context".to_string(),
2924                                arguments: "{\"order_id\":\"123\"}".to_string(),
2925                            },
2926                        }]),
2927                        finish_reason: FinishReason::ToolCalls,
2928                        logprobs: None,
2929                    }],
2930                    usage: Usage::new(10, 5),
2931                    created: None,
2932                    provider: Some(self.name().to_string()),
2933                    healing_metadata: None,
2934                }
2935            } else if has_tools && has_tool_result {
2936                CompletionResponse {
2937                    id: "resp_tool_2".to_string(),
2938                    model: request.model.clone(),
2939                    choices: vec![CompletionChoice {
2940                        index: 0,
2941                        message: Message::assistant("{\"state\":\"done\"}"),
2942                        finish_reason: FinishReason::Stop,
2943                        logprobs: None,
2944                    }],
2945                    usage: Usage::new(12, 6),
2946                    created: None,
2947                    provider: Some(self.name().to_string()),
2948                    healing_metadata: None,
2949                }
2950            } else {
2951                let prompt = request
2952                    .messages
2953                    .iter()
2954                    .rev()
2955                    .find(|m| m.role == Role::User)
2956                    .map(|m| m.content.clone())
2957                    .unwrap_or_default();
2958                let payload = json!({
2959                    "subject": "ok",
2960                    "body": prompt,
2961                })
2962                .to_string();
2963                CompletionResponse {
2964                    id: "resp_final".to_string(),
2965                    model: request.model.clone(),
2966                    choices: vec![CompletionChoice {
2967                        index: 0,
2968                        message: Message::assistant(payload),
2969                        finish_reason: FinishReason::Stop,
2970                        logprobs: None,
2971                    }],
2972                    usage: Usage::new(8, 4),
2973                    created: None,
2974                    provider: Some(self.name().to_string()),
2975                    healing_metadata: None,
2976                }
2977            };
2978
2979            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
2980            Ok(ProviderResponse::new(200, body))
2981        }
2982
2983        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
2984            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
2985        }
2986    }
2987
2988    #[async_trait]
2989    impl Provider for UnknownToolProvider {
2990        fn name(&self) -> &str {
2991            "openai"
2992        }
2993
2994        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
2995            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
2996            Ok(ProviderRequest::new("mock://unknown-tool").with_body(body))
2997        }
2998
2999        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
3000            let request: CompletionRequest =
3001                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
3002
3003            let response = CompletionResponse {
3004                id: "resp_unknown_tool".to_string(),
3005                model: request.model,
3006                choices: vec![CompletionChoice {
3007                    index: 0,
3008                    message: Message::assistant("").with_tool_calls(vec![ToolCall {
3009                        id: "call_unknown".to_string(),
3010                        tool_type: ToolType::Function,
3011                        function: ToolCallFunction {
3012                            name: "unknown_tool".to_string(),
3013                            arguments: "{}".to_string(),
3014                        },
3015                    }]),
3016                    finish_reason: FinishReason::ToolCalls,
3017                    logprobs: None,
3018                }],
3019                usage: Usage::new(5, 2),
3020                created: None,
3021                provider: Some(self.name().to_string()),
3022                healing_metadata: None,
3023            };
3024
3025            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
3026            Ok(ProviderResponse::new(200, body))
3027        }
3028
3029        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
3030            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
3031        }
3032    }
3033
3034    #[async_trait]
3035    impl Provider for ReasoningUsageProvider {
3036        fn name(&self) -> &str {
3037            "openai"
3038        }
3039
3040        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
3041            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
3042            Ok(ProviderRequest::new("mock://reasoning-usage").with_body(body))
3043        }
3044
3045        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
3046            let request: CompletionRequest =
3047                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
3048
3049            let mut usage = Usage::new(9, 5);
3050            usage.reasoning_tokens = Some(4);
3051            let response = CompletionResponse {
3052                id: "resp_reasoning".to_string(),
3053                model: request.model,
3054                choices: vec![CompletionChoice {
3055                    index: 0,
3056                    message: Message::assistant("{\"state\":\"ok\"}"),
3057                    finish_reason: FinishReason::Stop,
3058                    logprobs: None,
3059                }],
3060                usage,
3061                created: None,
3062                provider: Some(self.name().to_string()),
3063                healing_metadata: None,
3064            };
3065
3066            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
3067            Ok(ProviderResponse::new(200, body))
3068        }
3069
3070        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
3071            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
3072        }
3073    }
3074
3075    #[async_trait]
3076    impl Provider for ToolLoopReasoningProvider {
3077        fn name(&self) -> &str {
3078            "openai"
3079        }
3080
3081        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
3082            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
3083            Ok(ProviderRequest::new("mock://tool-loop-reasoning").with_body(body))
3084        }
3085
3086        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
3087            let request: CompletionRequest =
3088                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
3089
3090            let has_tools = request
3091                .tools
3092                .as_ref()
3093                .is_some_and(|tools| !tools.is_empty());
3094            let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
3095
3096            let response = if has_tools && !has_tool_result {
3097                let mut usage = Usage::new(10, 5);
3098                usage.reasoning_tokens = Some(2);
3099                CompletionResponse {
3100                    id: "resp_tool_reasoning_1".to_string(),
3101                    model: request.model.clone(),
3102                    choices: vec![CompletionChoice {
3103                        index: 0,
3104                        message: Message::assistant("").with_tool_calls(vec![ToolCall {
3105                            id: "call_get_context".to_string(),
3106                            tool_type: ToolType::Function,
3107                            function: ToolCallFunction {
3108                                name: "get_customer_context".to_string(),
3109                                arguments: "{\"order_id\":\"123\"}".to_string(),
3110                            },
3111                        }]),
3112                        finish_reason: FinishReason::ToolCalls,
3113                        logprobs: None,
3114                    }],
3115                    usage,
3116                    created: None,
3117                    provider: Some(self.name().to_string()),
3118                    healing_metadata: None,
3119                }
3120            } else {
3121                let mut usage = Usage::new(12, 6);
3122                usage.reasoning_tokens = Some(3);
3123                CompletionResponse {
3124                    id: "resp_tool_reasoning_2".to_string(),
3125                    model: request.model,
3126                    choices: vec![CompletionChoice {
3127                        index: 0,
3128                        message: Message::assistant("{\"state\":\"done\"}"),
3129                        finish_reason: FinishReason::Stop,
3130                        logprobs: None,
3131                    }],
3132                    usage,
3133                    created: None,
3134                    provider: Some(self.name().to_string()),
3135                    healing_metadata: None,
3136                }
3137            };
3138
3139            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
3140            Ok(ProviderResponse::new(200, body))
3141        }
3142
3143        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
3144            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
3145        }
3146    }
3147
3148    struct FixedToolWorker {
3149        payload: Value,
3150    }
3151
3152    struct CountingToolWorker {
3153        execute_calls: AtomicUsize,
3154    }
3155
3156    #[async_trait]
3157    impl YamlWorkflowCustomWorkerExecutor for FixedToolWorker {
3158        async fn execute(
3159            &self,
3160            _handler: &str,
3161            _payload: &Value,
3162            _email_text: &str,
3163            _context: &Value,
3164        ) -> Result<Value, String> {
3165            Ok(self.payload.clone())
3166        }
3167    }
3168
3169    #[async_trait]
3170    impl YamlWorkflowCustomWorkerExecutor for CountingToolWorker {
3171        async fn execute(
3172            &self,
3173            _handler: &str,
3174            _payload: &Value,
3175            _email_text: &str,
3176            _context: &Value,
3177        ) -> Result<Value, String> {
3178            self.execute_calls.fetch_add(1, Ordering::SeqCst);
3179            Ok(json!({"ok": true}))
3180        }
3181    }
3182
3183    #[async_trait]
3184    impl YamlWorkflowLlmExecutor for CountingExecutor {
3185        async fn complete_structured(
3186            &self,
3187            _request: YamlLlmExecutionRequest,
3188            _event_sink: Option<&dyn YamlWorkflowEventSink>,
3189        ) -> Result<YamlLlmExecutionResult, String> {
3190            self.call_count.fetch_add(1, Ordering::SeqCst);
3191            Ok(YamlLlmExecutionResult {
3192                payload: json!({"state":"ok"}),
3193                usage: None,
3194                ttft_ms: None,
3195                tool_calls: Vec::new(),
3196            })
3197        }
3198    }
3199
3200    impl YamlWorkflowEventSink for RecordingSink {
3201        fn emit(&self, event: &YamlWorkflowEvent) {
3202            self.events
3203                .lock()
3204                .expect("recording sink lock should not be poisoned")
3205                .push(event.clone());
3206        }
3207    }
3208
3209    #[async_trait]
3210    impl YamlWorkflowCustomWorkerExecutor for CapturingWorker {
3211        async fn execute(
3212            &self,
3213            _handler: &str,
3214            _payload: &Value,
3215            _email_text: &str,
3216            context: &Value,
3217        ) -> Result<Value, String> {
3218            let mut guard = self
3219                .context
3220                .lock()
3221                .map_err(|_| "capturing worker lock should not be poisoned".to_string())?;
3222            *guard = Some(context.clone());
3223            Ok(json!({"ok": true}))
3224        }
3225    }
3226
3227    #[async_trait]
3228    impl YamlWorkflowLlmExecutor for MockExecutor {
3229        async fn complete_structured(
3230            &self,
3231            request: YamlLlmExecutionRequest,
3232            _event_sink: Option<&dyn YamlWorkflowEventSink>,
3233        ) -> Result<YamlLlmExecutionResult, String> {
3234            let prompt = request.prompt;
3235            if prompt.contains("exactly one category") {
3236                return Ok(YamlLlmExecutionResult {
3237                    payload: json!({"category":"termination","reason":"mock"}),
3238                    usage: Some(YamlLlmTokenUsage {
3239                        prompt_tokens: 10,
3240                        completion_tokens: 5,
3241                        total_tokens: 15,
3242                        reasoning_tokens: None,
3243                    }),
3244                    ttft_ms: None,
3245                    tool_calls: Vec::new(),
3246                });
3247            }
3248            if prompt.contains("Determine termination subtype") {
3249                return Ok(YamlLlmExecutionResult {
3250                    payload: json!({"subtype":"repeated_offense","reason":"mock"}),
3251                    usage: Some(YamlLlmTokenUsage {
3252                        prompt_tokens: 12,
3253                        completion_tokens: 6,
3254                        total_tokens: 18,
3255                        reasoning_tokens: None,
3256                    }),
3257                    ttft_ms: None,
3258                    tool_calls: Vec::new(),
3259                });
3260            }
3261            if prompt.contains("Determine supply chain subtype") {
3262                return Ok(YamlLlmExecutionResult {
3263                    payload: json!({"subtype":"order_replacement","reason":"mock"}),
3264                    usage: Some(YamlLlmTokenUsage {
3265                        prompt_tokens: 11,
3266                        completion_tokens: 4,
3267                        total_tokens: 15,
3268                        reasoning_tokens: None,
3269                    }),
3270                    ttft_ms: None,
3271                    tool_calls: Vec::new(),
3272                });
3273            }
3274            Err("unexpected prompt".to_string())
3275        }
3276    }
3277
3278    #[tokio::test]
3279    async fn runs_yaml_workflow_and_returns_step_timings() {
3280        let yaml = r#"
3281id: email-intake-classification
3282entry_node: classify_top_level
3283nodes:
3284  - id: classify_top_level
3285    node_type:
3286      llm_call:
3287        model: gpt-4.1
3288    config:
3289      prompt: |
3290        Classify this email into exactly one category:
3291        {{ input.email_text }}
3292  - id: route_top_level
3293    node_type:
3294      switch:
3295        branches:
3296          - condition: '$.nodes.classify_top_level.output.category == "termination"'
3297            target: classify_termination_subtype
3298        default: rag_clarification
3299  - id: classify_termination_subtype
3300    node_type:
3301      llm_call:
3302        model: gpt-4.1
3303    config:
3304      prompt: |
3305        Determine termination subtype:
3306        {{ input.email_text }}
3307  - id: route_termination_subtype
3308    node_type:
3309      switch:
3310        branches:
3311          - condition: '$.nodes.classify_termination_subtype.output.subtype == "repeated_offense"'
3312            target: rag_termination_repeated_offense
3313        default: rag_clarification
3314  - id: rag_termination_repeated_offense
3315    node_type:
3316      custom_worker:
3317        handler: GetRagData
3318    config:
3319      payload:
3320        topic: termination_repeated_offense
3321  - id: rag_clarification
3322    node_type:
3323      custom_worker:
3324        handler: GetRagData
3325    config:
3326      payload:
3327        topic: clarification
3328edges:
3329  - from: classify_top_level
3330    to: route_top_level
3331  - from: classify_termination_subtype
3332    to: route_termination_subtype
3333"#;
3334
3335        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3336        let custom_worker = FixedToolWorker {
3337            payload: json!({"context": "mock"}),
3338        };
3339        let output = run_email_workflow_yaml_with_custom_worker(
3340            &workflow,
3341            "test",
3342            &MockExecutor,
3343            Some(&custom_worker),
3344        )
3345        .await
3346        .expect("yaml workflow should execute");
3347
3348        assert_eq!(output.workflow_id, "email-intake-classification");
3349        assert_eq!(output.terminal_node, "rag_termination_repeated_offense");
3350        assert!(!output.step_timings.is_empty());
3351        assert_eq!(output.step_timings.len(), output.trace.len());
3352        assert!(output
3353            .outputs
3354            .contains_key("rag_termination_repeated_offense"));
3355        assert_eq!(output.total_input_tokens, 22);
3356        assert_eq!(output.total_output_tokens, 11);
3357        assert_eq!(output.total_tokens, 33);
3358    }
3359
3360    #[tokio::test]
3361    async fn workflow_runner_executes_inline_workflow_with_executor() {
3362        let yaml = r#"
3363id: email-intake-classification
3364entry_node: classify_top_level
3365nodes:
3366  - id: classify_top_level
3367    node_type:
3368      llm_call:
3369        model: gpt-4.1
3370    config:
3371      prompt: |
3372        Classify this email into exactly one category:
3373        {{ input.email_text }}
3374"#;
3375
3376        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3377        let output = WorkflowRunner::from_workflow(&workflow)
3378            .with_email_text("test")
3379            .with_executor(&MockExecutor)
3380            .run()
3381            .await
3382            .expect("builder execution should succeed");
3383
3384        assert_eq!(output.workflow_id, "email-intake-classification");
3385        assert!(output.outputs.contains_key("classify_top_level"));
3386    }
3387
3388    #[tokio::test]
3389    async fn workflow_runner_requires_executor() {
3390        let yaml = r#"
3391id: missing-executor
3392entry_node: end
3393nodes:
3394  - id: end
3395    node_type:
3396      end: {}
3397"#;
3398        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3399
3400        let err = WorkflowRunner::from_workflow(&workflow)
3401            .with_input(&json!({"email_text":"x"}))
3402            .run()
3403            .await
3404            .expect_err("missing executor should fail");
3405
3406        assert!(matches!(
3407            err,
3408            YamlWorkflowRunError::InvalidInput { message }
3409                if message.contains("workflow executor is required")
3410        ));
3411    }
3412
3413    #[tokio::test]
3414    async fn emits_resolved_llm_input_event_with_bindings() {
3415        let yaml = r#"
3416id: email-intake-classification
3417entry_node: classify_top_level
3418nodes:
3419  - id: classify_top_level
3420    node_type:
3421      llm_call:
3422        model: gpt-4.1
3423    config:
3424      prompt: |
3425        Classify this email into exactly one category:
3426        {{ input.email_text }}
3427"#;
3428
3429        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3430        let sink = RecordingSink {
3431            events: Mutex::new(Vec::new()),
3432        };
3433
3434        let output = run_email_workflow_yaml_with_custom_worker_and_events(
3435            &workflow,
3436            "Need help with termination",
3437            &MockExecutor,
3438            None,
3439            Some(&sink),
3440        )
3441        .await
3442        .expect("yaml workflow should execute");
3443
3444        assert_eq!(output.terminal_node, "classify_top_level");
3445
3446        let events = sink
3447            .events
3448            .lock()
3449            .expect("recording sink lock should not be poisoned");
3450        let llm_event = events
3451            .iter()
3452            .find(|event| event.event_type == "node_llm_input_resolved")
3453            .expect("expected llm input telemetry event");
3454
3455        let metadata = llm_event
3456            .metadata
3457            .as_ref()
3458            .expect("llm input event must include metadata");
3459        assert_eq!(metadata["model"], Value::String("gpt-4.1".to_string()));
3460        assert_eq!(metadata["stream_requested"], Value::Bool(false));
3461        assert_eq!(metadata["heal_requested"], Value::Bool(false));
3462        assert!(metadata["prompt"]
3463            .as_str()
3464            .expect("prompt should be a string")
3465            .contains("Need help with termination"));
3466
3467        let bindings = metadata["bindings"]
3468            .as_array()
3469            .expect("bindings should be an array");
3470        assert_eq!(bindings.len(), 1);
3471        assert_eq!(
3472            bindings[0]["source_path"],
3473            Value::String("input.email_text".to_string())
3474        );
3475        assert_eq!(
3476            bindings[0]["resolved"],
3477            Value::String("Need help with termination".to_string())
3478        );
3479        assert_eq!(bindings[0]["missing"], Value::Bool(false));
3480        assert_eq!(
3481            bindings[0]["resolved_type"],
3482            Value::String("string".to_string())
3483        );
3484    }
3485
3486    #[tokio::test]
3487    async fn workflow_completed_event_includes_nerdstats_by_default() {
3488        let yaml = r#"
3489id: nerdstats-default
3490entry_node: classify
3491nodes:
3492  - id: classify
3493    node_type:
3494      llm_call:
3495        model: gpt-4.1
3496    config:
3497      prompt: |
3498        Classify this email into exactly one category:
3499        {{ input.email_text }}
3500"#;
3501
3502        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3503        let sink = RecordingSink {
3504            events: Mutex::new(Vec::new()),
3505        };
3506
3507        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
3508            &workflow,
3509            &json!({"email_text":"hello"}),
3510            &MockExecutor,
3511            None,
3512            Some(&sink),
3513            &YamlWorkflowRunOptions::default(),
3514        )
3515        .await
3516        .expect("workflow should execute");
3517
3518        let events = sink
3519            .events
3520            .lock()
3521            .expect("recording sink lock should not be poisoned");
3522        let completed = events
3523            .iter()
3524            .find(|event| event.event_type == "workflow_completed")
3525            .expect("expected workflow_completed event");
3526        let metadata = completed
3527            .metadata
3528            .as_ref()
3529            .expect("workflow_completed should include metadata by default");
3530        let nerdstats = metadata
3531            .get("nerdstats")
3532            .expect("nerdstats should be present by default");
3533
3534        assert_eq!(nerdstats["workflow_id"], Value::String(output.workflow_id));
3535        assert_eq!(
3536            nerdstats["terminal_node"],
3537            Value::String(output.terminal_node)
3538        );
3539        assert_eq!(
3540            nerdstats["total_tokens"],
3541            Value::Number(output.total_tokens.into())
3542        );
3543        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
3544        assert_eq!(
3545            nerdstats["token_metrics_source"],
3546            Value::String("provider_usage".to_string())
3547        );
3548        assert!(nerdstats.get("step_timings").is_none());
3549        assert!(nerdstats.get("llm_node_metrics").is_none());
3550        assert!(nerdstats.get("step_details").is_some());
3551        assert!(nerdstats.get("llm_node_models").is_none());
3552        assert_eq!(
3553            nerdstats["step_details"][0]["model_name"],
3554            Value::String("gpt-4.1".to_string())
3555        );
3556        assert_eq!(
3557            nerdstats["step_details"][0]["node_id"],
3558            Value::String("classify".to_string())
3559        );
3560        assert_eq!(nerdstats["ttft_ms"], Value::Null);
3561    }
3562
3563    #[tokio::test]
3564    async fn workflow_completed_event_omits_nerdstats_when_disabled() {
3565        let yaml = r#"
3566id: nerdstats-disabled
3567entry_node: classify
3568nodes:
3569  - id: classify
3570    node_type:
3571      llm_call:
3572        model: gpt-4.1
3573    config:
3574      prompt: |
3575        Classify this email into exactly one category:
3576        {{ input.email_text }}
3577"#;
3578
3579        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3580        let sink = RecordingSink {
3581            events: Mutex::new(Vec::new()),
3582        };
3583        let options = YamlWorkflowRunOptions {
3584            telemetry: YamlWorkflowTelemetryConfig {
3585                nerdstats: false,
3586                ..YamlWorkflowTelemetryConfig::default()
3587            },
3588            ..YamlWorkflowRunOptions::default()
3589        };
3590
3591        run_workflow_yaml_with_custom_worker_and_events_and_options(
3592            &workflow,
3593            &json!({"email_text":"hello"}),
3594            &MockExecutor,
3595            None,
3596            Some(&sink),
3597            &options,
3598        )
3599        .await
3600        .expect("workflow should execute");
3601
3602        let events = sink
3603            .events
3604            .lock()
3605            .expect("recording sink lock should not be poisoned");
3606        let completed = events
3607            .iter()
3608            .find(|event| event.event_type == "workflow_completed")
3609            .expect("expected workflow_completed event");
3610        assert!(completed.metadata.is_none());
3611    }
3612
3613    struct StreamAwareMockExecutor;
3614
3615    #[async_trait]
3616    impl YamlWorkflowLlmExecutor for StreamAwareMockExecutor {
3617        async fn complete_structured(
3618            &self,
3619            request: YamlLlmExecutionRequest,
3620            _event_sink: Option<&dyn YamlWorkflowEventSink>,
3621        ) -> Result<YamlLlmExecutionResult, String> {
3622            Ok(YamlLlmExecutionResult {
3623                payload: json!({"state":"ok"}),
3624                usage: Some(YamlLlmTokenUsage {
3625                    prompt_tokens: 20,
3626                    completion_tokens: 10,
3627                    total_tokens: 30,
3628                    reasoning_tokens: None,
3629                }),
3630                ttft_ms: if request.stream { Some(12) } else { None },
3631                tool_calls: Vec::new(),
3632            })
3633        }
3634    }
3635
3636    #[tokio::test]
3637    async fn workflow_completed_event_includes_nerdstats_for_streaming_nodes() {
3638        let yaml = r#"
3639id: nerdstats-streaming
3640entry_node: classify
3641nodes:
3642  - id: classify
3643    node_type:
3644      llm_call:
3645        model: gpt-4.1
3646        stream: true
3647    config:
3648      prompt: |
3649        Return JSON only:
3650        {"state":"ok"}
3651"#;
3652
3653        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3654        let sink = RecordingSink {
3655            events: Mutex::new(Vec::new()),
3656        };
3657
3658        run_workflow_yaml_with_custom_worker_and_events_and_options(
3659            &workflow,
3660            &json!({"email_text":"hello"}),
3661            &StreamAwareMockExecutor,
3662            None,
3663            Some(&sink),
3664            &YamlWorkflowRunOptions::default(),
3665        )
3666        .await
3667        .expect("workflow should execute");
3668
3669        let events = sink
3670            .events
3671            .lock()
3672            .expect("recording sink lock should not be poisoned");
3673        let completed = events
3674            .iter()
3675            .find(|event| event.event_type == "workflow_completed")
3676            .expect("expected workflow_completed event");
3677        let metadata = completed
3678            .metadata
3679            .as_ref()
3680            .expect("workflow_completed should include metadata by default");
3681        let nerdstats = metadata
3682            .get("nerdstats")
3683            .expect("nerdstats should be present by default");
3684
3685        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
3686        assert_eq!(nerdstats["total_tokens"], Value::Number(30u64.into()));
3687        assert_eq!(nerdstats["ttft_ms"], Value::Number(12u64.into()));
3688        assert!(nerdstats.get("step_timings").is_none());
3689        assert!(nerdstats.get("llm_node_metrics").is_none());
3690        assert_eq!(
3691            nerdstats["step_details"][0]["model_name"],
3692            Value::String("gpt-4.1".to_string())
3693        );
3694        assert_eq!(
3695            nerdstats["step_details"][0]["node_id"],
3696            Value::String("classify".to_string())
3697        );
3698    }
3699
3700    #[test]
3701    fn workflow_nerdstats_marks_stream_token_metrics_unavailable() {
3702        let output = YamlWorkflowRunOutput {
3703            workflow_id: "workflow".to_string(),
3704            entry_node: "start".to_string(),
3705            email_text: "hello".to_string(),
3706            trace: vec!["llm_node".to_string()],
3707            outputs: BTreeMap::new(),
3708            terminal_node: "llm_node".to_string(),
3709            terminal_output: None,
3710            step_timings: vec![YamlStepTiming {
3711                node_id: "llm_node".to_string(),
3712                node_kind: "llm_call".to_string(),
3713                model_name: Some("gpt-4.1".to_string()),
3714                elapsed_ms: 100,
3715                prompt_tokens: None,
3716                completion_tokens: None,
3717                total_tokens: None,
3718                reasoning_tokens: None,
3719                tokens_per_second: None,
3720            }],
3721            llm_node_metrics: BTreeMap::new(),
3722            llm_node_models: BTreeMap::new(),
3723            total_elapsed_ms: 100,
3724            ttft_ms: None,
3725            total_input_tokens: 0,
3726            total_output_tokens: 0,
3727            total_tokens: 0,
3728            total_reasoning_tokens: None,
3729            tokens_per_second: 0.0,
3730            trace_id: Some("trace-1".to_string()),
3731            metadata: None,
3732        };
3733
3734        let nerdstats = workflow_nerdstats(&output);
3735        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(false));
3736        assert_eq!(
3737            nerdstats["token_metrics_source"],
3738            Value::String("provider_stream_usage_unavailable".to_string())
3739        );
3740        assert_eq!(nerdstats["total_tokens"], Value::Null);
3741        assert_eq!(nerdstats["ttft_ms"], Value::Null);
3742        assert!(nerdstats.get("step_timings").is_none());
3743        assert!(nerdstats.get("llm_node_metrics").is_none());
3744        assert_eq!(
3745            nerdstats["step_details"][0]["node_id"],
3746            Value::String("llm_node".to_string())
3747        );
3748        assert_eq!(
3749            nerdstats["step_details"][0]["model_name"],
3750            Value::String("gpt-4.1".to_string())
3751        );
3752    }
3753
3754    #[test]
3755    fn workflow_nerdstats_includes_ttft_when_available() {
3756        let output = YamlWorkflowRunOutput {
3757            workflow_id: "workflow".to_string(),
3758            entry_node: "start".to_string(),
3759            email_text: "hello".to_string(),
3760            trace: vec!["llm_node".to_string()],
3761            outputs: BTreeMap::new(),
3762            terminal_node: "llm_node".to_string(),
3763            terminal_output: None,
3764            step_timings: vec![YamlStepTiming {
3765                node_id: "llm_node".to_string(),
3766                node_kind: "llm_call".to_string(),
3767                model_name: Some("gpt-4.1".to_string()),
3768                elapsed_ms: 100,
3769                prompt_tokens: Some(10),
3770                completion_tokens: Some(15),
3771                total_tokens: Some(25),
3772                reasoning_tokens: None,
3773                tokens_per_second: Some(150.0),
3774            }],
3775            llm_node_metrics: BTreeMap::new(),
3776            llm_node_models: BTreeMap::new(),
3777            total_elapsed_ms: 100,
3778            ttft_ms: Some(42),
3779            total_input_tokens: 10,
3780            total_output_tokens: 15,
3781            total_tokens: 25,
3782            total_reasoning_tokens: None,
3783            tokens_per_second: 150.0,
3784            trace_id: Some("trace-2".to_string()),
3785            metadata: None,
3786        };
3787
3788        let nerdstats = workflow_nerdstats(&output);
3789        assert_eq!(nerdstats["ttft_ms"], Value::Number(42u64.into()));
3790        assert!(nerdstats.get("step_timings").is_none());
3791        assert!(nerdstats.get("llm_node_metrics").is_none());
3792        assert_eq!(
3793            nerdstats["step_details"][0]["node_id"],
3794            Value::String("llm_node".to_string())
3795        );
3796        assert_eq!(
3797            nerdstats["step_details"][0]["model_name"],
3798            Value::String("gpt-4.1".to_string())
3799        );
3800    }
3801
3802    #[test]
3803    fn workflow_nerdstats_schema_contract_is_stable() {
3804        let output = YamlWorkflowRunOutput {
3805            workflow_id: "schema-workflow".to_string(),
3806            entry_node: "start".to_string(),
3807            email_text: "hello".to_string(),
3808            trace: vec!["classify".to_string(), "route".to_string()],
3809            outputs: BTreeMap::new(),
3810            terminal_node: "route".to_string(),
3811            terminal_output: None,
3812            step_timings: vec![
3813                YamlStepTiming {
3814                    node_id: "classify".to_string(),
3815                    node_kind: "llm_call".to_string(),
3816                    model_name: Some("gpt-4.1".to_string()),
3817                    elapsed_ms: 100,
3818                    prompt_tokens: Some(11),
3819                    completion_tokens: Some(22),
3820                    total_tokens: Some(33),
3821                    reasoning_tokens: Some(7),
3822                    tokens_per_second: Some(220.0),
3823                },
3824                YamlStepTiming {
3825                    node_id: "route".to_string(),
3826                    node_kind: "switch".to_string(),
3827                    model_name: None,
3828                    elapsed_ms: 0,
3829                    prompt_tokens: None,
3830                    completion_tokens: None,
3831                    total_tokens: None,
3832                    reasoning_tokens: None,
3833                    tokens_per_second: None,
3834                },
3835            ],
3836            llm_node_metrics: BTreeMap::new(),
3837            llm_node_models: BTreeMap::new(),
3838            total_elapsed_ms: 100,
3839            ttft_ms: Some(9),
3840            total_input_tokens: 11,
3841            total_output_tokens: 22,
3842            total_tokens: 33,
3843            total_reasoning_tokens: Some(7),
3844            tokens_per_second: 220.0,
3845            trace_id: Some("trace-schema".to_string()),
3846            metadata: None,
3847        };
3848
3849        let nerdstats = workflow_nerdstats(&output);
3850        let expected = json!({
3851            "workflow_id": "schema-workflow",
3852            "terminal_node": "route",
3853            "total_elapsed_ms": 100,
3854            "ttft_ms": 9,
3855            "step_details": [
3856                {
3857                    "node_id": "classify",
3858                    "node_kind": "llm_call",
3859                    "model_name": "gpt-4.1",
3860                    "elapsed_ms": 100,
3861                    "prompt_tokens": 11,
3862                    "completion_tokens": 22,
3863                    "total_tokens": 33,
3864                    "reasoning_tokens": 7,
3865                    "tokens_per_second": 220.0
3866                },
3867                {
3868                    "node_id": "route",
3869                    "node_kind": "switch",
3870                    "elapsed_ms": 0
3871                }
3872            ],
3873            "total_input_tokens": 11,
3874            "total_output_tokens": 22,
3875            "total_tokens": 33,
3876            "total_reasoning_tokens": 7,
3877            "tokens_per_second": 220.0,
3878            "trace_id": "trace-schema",
3879            "token_metrics_available": true,
3880            "token_metrics_source": "provider_usage",
3881            "llm_nodes_without_usage": []
3882        });
3883
3884        assert_eq!(nerdstats, expected);
3885    }
3886
3887    struct MessageHistoryExecutor;
3888
3889    #[async_trait]
3890    impl YamlWorkflowLlmExecutor for MessageHistoryExecutor {
3891        async fn complete_structured(
3892            &self,
3893            request: YamlLlmExecutionRequest,
3894            _event_sink: Option<&dyn YamlWorkflowEventSink>,
3895        ) -> Result<YamlLlmExecutionResult, String> {
3896            let messages = request
3897                .messages
3898                .ok_or_else(|| "expected messages in request".to_string())?;
3899            if messages.len() != 2 {
3900                return Err(format!("expected 2 messages, got {}", messages.len()));
3901            }
3902            Ok(YamlLlmExecutionResult {
3903                payload: json!({"category":"termination","reason":"history"}),
3904                usage: Some(YamlLlmTokenUsage {
3905                    prompt_tokens: 7,
3906                    completion_tokens: 3,
3907                    total_tokens: 10,
3908                    reasoning_tokens: None,
3909                }),
3910                ttft_ms: None,
3911                tool_calls: Vec::new(),
3912            })
3913        }
3914    }
3915
3916    #[tokio::test]
3917    async fn supports_messages_path_in_workflow_input() {
3918        let yaml = r#"
3919id: email-intake-classification
3920entry_node: classify_top_level
3921nodes:
3922  - id: classify_top_level
3923    node_type:
3924      llm_call:
3925        model: gpt-4.1
3926        messages_path: input.messages
3927        append_prompt_as_user: false
3928"#;
3929
3930        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3931        let input = json!({
3932            "email_text": "ignored",
3933            "messages": [
3934                {"role": "system", "content": "You are a classifier"},
3935                {"role": "user", "content": "Please classify this"}
3936            ]
3937        });
3938
3939        let output = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
3940            .await
3941            .expect("workflow should use chat history from input");
3942
3943        assert_eq!(output.terminal_node, "classify_top_level");
3944        assert_eq!(
3945            output.outputs["classify_top_level"]["output"]["reason"],
3946            Value::String("history".to_string())
3947        );
3948    }
3949
3950    #[tokio::test]
3951    async fn wrapper_entrypoints_produce_equivalent_outputs() {
3952        let yaml = r#"
3953id: wrapper-equivalence
3954entry_node: classify
3955nodes:
3956  - id: classify
3957    node_type:
3958      llm_call:
3959        model: gpt-4.1
3960    config:
3961      prompt: |
3962        Classify this email into exactly one category:
3963        {{ input.email_text }}
3964"#;
3965
3966        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3967        let input = json!({"email_text":"hello"});
3968
3969        let a = run_workflow_yaml(&workflow, &input, &MockExecutor)
3970            .await
3971            .expect("base entrypoint should execute");
3972        let b = run_workflow_yaml_with_custom_worker(&workflow, &input, &MockExecutor, None)
3973            .await
3974            .expect("custom worker wrapper should execute");
3975        let c = run_workflow_yaml_with_custom_worker_and_events_and_options(
3976            &workflow,
3977            &input,
3978            &MockExecutor,
3979            None,
3980            None,
3981            &YamlWorkflowRunOptions::default(),
3982        )
3983        .await
3984        .expect("events/options wrapper should execute");
3985
3986        assert_eq!(a.workflow_id, b.workflow_id);
3987        assert_eq!(a.workflow_id, c.workflow_id);
3988        assert_eq!(a.terminal_node, b.terminal_node);
3989        assert_eq!(a.terminal_node, c.terminal_node);
3990        assert_eq!(a.outputs, b.outputs);
3991        assert_eq!(a.outputs, c.outputs);
3992        assert_eq!(a.total_tokens, b.total_tokens);
3993        assert_eq!(a.total_tokens, c.total_tokens);
3994    }
3995
3996    #[tokio::test]
3997    async fn yaml_llm_tool_calling_captures_traces_and_supports_globals_reference() {
3998        let yaml = r#"
3999id: tool-calling-workflow
4000entry_node: generate_with_tool
4001nodes:
4002  - id: generate_with_tool
4003    node_type:
4004      llm_call:
4005        model: gpt-4.1
4006        tools_format: simplified
4007        max_tool_roundtrips: 1
4008        tool_calls_global_key: audit
4009        tools:
4010          - name: get_customer_context
4011            description: Fetch customer context
4012            input_schema:
4013              type: object
4014              properties:
4015                order_id: { type: string }
4016              required: [order_id]
4017              additionalProperties: false
4018            output_schema:
4019              type: object
4020              properties:
4021                customer_name: { type: string }
4022              required: [customer_name]
4023              additionalProperties: false
4024    config:
4025      output_schema:
4026        type: object
4027        properties:
4028          state: { type: string }
4029        required: [state]
4030  - id: personalize
4031    node_type:
4032      llm_call:
4033        model: gpt-4.1
4034    config:
4035      prompt: |
4036        Write an email greeting for {{ globals.audit.0.output.customer_name }}.
4037      output_schema:
4038        type: object
4039        properties:
4040          subject: { type: string }
4041          body: { type: string }
4042        required: [subject, body]
4043edges:
4044  - from: generate_with_tool
4045    to: personalize
4046"#;
4047
4048        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4049        let client = SimpleAgentsClientBuilder::new()
4050            .with_provider(Arc::new(ToolLoopProvider))
4051            .build()
4052            .expect("client should build");
4053        let worker = FixedToolWorker {
4054            payload: json!({"customer_name": "Ava"}),
4055        };
4056
4057        let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4058            &workflow,
4059            &json!({"email_text":"hello"}),
4060            &client,
4061            Some(&worker),
4062            None,
4063            &YamlWorkflowRunOptions::default(),
4064        )
4065        .await
4066        .expect("workflow should execute");
4067
4068        assert_eq!(output.trace, vec!["generate_with_tool", "personalize"]);
4069        assert_eq!(
4070            output.outputs["generate_with_tool"]["tool_calls"][0]["output"]["customer_name"],
4071            Value::String("Ava".to_string())
4072        );
4073        let body = output.outputs["personalize"]["output"]["body"]
4074            .as_str()
4075            .expect("body should be string");
4076        assert!(body.contains("Ava"));
4077    }
4078
4079    #[tokio::test]
4080    async fn workflow_with_client_preserves_reasoning_tokens_in_output_and_nerdstats() {
4081        let yaml = r#"
4082id: reasoning-usage-workflow
4083entry_node: classify
4084nodes:
4085  - id: classify
4086    node_type:
4087      llm_call:
4088        model: gpt-4.1
4089    config:
4090      prompt: |
4091        Return JSON only:
4092        {"state":"ok"}
4093      output_schema:
4094        type: object
4095        properties:
4096          state: { type: string }
4097        required: [state]
4098"#;
4099
4100        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4101        let client = SimpleAgentsClientBuilder::new()
4102            .with_provider(Arc::new(ReasoningUsageProvider))
4103            .build()
4104            .expect("client should build");
4105
4106        let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4107            &workflow,
4108            &json!({"email_text":"hello"}),
4109            &client,
4110            None,
4111            None,
4112            &YamlWorkflowRunOptions::default(),
4113        )
4114        .await
4115        .expect("workflow should execute");
4116
4117        assert_eq!(output.total_reasoning_tokens, Some(4));
4118        assert_eq!(output.step_timings.len(), 1);
4119        assert_eq!(output.step_timings[0].reasoning_tokens, Some(4));
4120
4121        let nerdstats = workflow_nerdstats(&output);
4122        assert_eq!(
4123            nerdstats["total_reasoning_tokens"],
4124            Value::Number(4u64.into())
4125        );
4126        assert_eq!(
4127            nerdstats["step_details"][0]["reasoning_tokens"],
4128            Value::Number(4u64.into())
4129        );
4130    }
4131
4132    #[tokio::test]
4133    async fn workflow_with_tools_accumulates_reasoning_tokens_across_roundtrips() {
4134        let yaml = r#"
4135id: tool-reasoning-workflow
4136entry_node: generate_with_tool
4137nodes:
4138  - id: generate_with_tool
4139    node_type:
4140      llm_call:
4141        model: gpt-4.1
4142        tools_format: simplified
4143        max_tool_roundtrips: 1
4144        tools:
4145          - name: get_customer_context
4146            input_schema:
4147              type: object
4148              properties:
4149                order_id: { type: string }
4150              required: [order_id]
4151              additionalProperties: false
4152            output_schema:
4153              type: object
4154              properties:
4155                customer_name: { type: string }
4156              required: [customer_name]
4157              additionalProperties: false
4158    config:
4159      output_schema:
4160        type: object
4161        properties:
4162          state: { type: string }
4163        required: [state]
4164"#;
4165
4166        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4167        let client = SimpleAgentsClientBuilder::new()
4168            .with_provider(Arc::new(ToolLoopReasoningProvider))
4169            .build()
4170            .expect("client should build");
4171        let worker = FixedToolWorker {
4172            payload: json!({"customer_name": "Ava"}),
4173        };
4174
4175        let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4176            &workflow,
4177            &json!({"email_text":"hello"}),
4178            &client,
4179            Some(&worker),
4180            None,
4181            &YamlWorkflowRunOptions::default(),
4182        )
4183        .await
4184        .expect("workflow should execute");
4185
4186        assert_eq!(output.total_reasoning_tokens, Some(5));
4187        assert_eq!(output.step_timings.len(), 1);
4188        assert_eq!(output.step_timings[0].reasoning_tokens, Some(5));
4189
4190        let nerdstats = workflow_nerdstats(&output);
4191        assert_eq!(
4192            nerdstats["total_reasoning_tokens"],
4193            Value::Number(5u64.into())
4194        );
4195        assert_eq!(
4196            nerdstats["step_details"][0]["reasoning_tokens"],
4197            Value::Number(5u64.into())
4198        );
4199    }
4200
4201    #[tokio::test]
4202    async fn yaml_llm_tool_output_schema_mismatch_hard_fails_node() {
4203        let yaml = r#"
4204id: tool-calling-schema-fail
4205entry_node: generate_with_tool
4206nodes:
4207  - id: generate_with_tool
4208    node_type:
4209      llm_call:
4210        model: gpt-4.1
4211        tools_format: simplified
4212        max_tool_roundtrips: 1
4213        tools:
4214          - name: get_customer_context
4215            input_schema:
4216              type: object
4217              properties:
4218                order_id: { type: string }
4219              required: [order_id]
4220              additionalProperties: false
4221            output_schema:
4222              type: object
4223              properties:
4224                customer_name: { type: string }
4225              required: [customer_name]
4226              additionalProperties: false
4227    config:
4228      output_schema:
4229        type: object
4230        properties:
4231          state: { type: string }
4232        required: [state]
4233"#;
4234
4235        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4236        let client = SimpleAgentsClientBuilder::new()
4237            .with_provider(Arc::new(ToolLoopProvider))
4238            .build()
4239            .expect("client should build");
4240        let worker = FixedToolWorker {
4241            payload: json!({"unexpected": "shape"}),
4242        };
4243
4244        let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4245            &workflow,
4246            &json!({"email_text":"hello"}),
4247            &client,
4248            Some(&worker),
4249            None,
4250            &YamlWorkflowRunOptions::default(),
4251        )
4252        .await
4253        .expect_err("workflow should hard-fail on schema mismatch");
4254
4255        match error {
4256            YamlWorkflowRunError::Llm { message, .. } => {
4257                assert!(message.contains("output failed schema validation"));
4258            }
4259            other => panic!("expected llm error, got {other:?}"),
4260        }
4261    }
4262
4263    #[tokio::test]
4264    async fn yaml_llm_unknown_tool_is_rejected_before_custom_worker_execution() {
4265        let yaml = r#"
4266id: unknown-tool-rejected
4267entry_node: generate_with_tool
4268nodes:
4269  - id: generate_with_tool
4270    node_type:
4271      llm_call:
4272        model: gpt-4.1
4273        tools_format: simplified
4274        max_tool_roundtrips: 1
4275        tools:
4276          - name: get_customer_context
4277            input_schema:
4278              type: object
4279              properties:
4280                order_id: { type: string }
4281              required: [order_id]
4282    config:
4283      output_schema:
4284        type: object
4285        properties:
4286          state: { type: string }
4287        required: [state]
4288"#;
4289
4290        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4291        let client = SimpleAgentsClientBuilder::new()
4292            .with_provider(Arc::new(UnknownToolProvider))
4293            .build()
4294            .expect("client should build");
4295        let worker = CountingToolWorker {
4296            execute_calls: AtomicUsize::new(0),
4297        };
4298
4299        let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4300            &workflow,
4301            &json!({"email_text":"hello"}),
4302            &client,
4303            Some(&worker),
4304            None,
4305            &YamlWorkflowRunOptions::default(),
4306        )
4307        .await
4308        .expect_err("workflow should reject unknown tool before executing worker");
4309
4310        match error {
4311            YamlWorkflowRunError::Llm { message, .. } => {
4312                assert!(message.contains("model requested unknown tool 'unknown_tool'"));
4313            }
4314            other => panic!("expected llm error, got {other:?}"),
4315        }
4316
4317        assert_eq!(worker.execute_calls.load(Ordering::SeqCst), 0);
4318    }
4319
4320    #[test]
4321    fn validates_tools_format_mismatch() {
4322        let yaml = r#"
4323id: mismatch
4324entry_node: generate
4325nodes:
4326  - id: generate
4327    node_type:
4328      llm_call:
4329        model: gpt-4.1
4330        tools_format: openai
4331        tools:
4332          - name: get_customer_context
4333            input_schema:
4334              type: object
4335              properties:
4336                order_id: { type: string }
4337              required: [order_id]
4338"#;
4339
4340        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4341        let diagnostics = verify_yaml_workflow(&workflow);
4342        assert!(diagnostics
4343            .iter()
4344            .any(|diagnostic| diagnostic.code == "invalid_tools_format"));
4345    }
4346
4347    #[tokio::test]
4348    async fn custom_worker_node_requires_executor() {
4349        let yaml = r#"
4350id: missing-custom-worker
4351entry_node: enrich
4352nodes:
4353  - id: enrich
4354    node_type:
4355      custom_worker:
4356        handler: GetEmployeeRecord
4357"#;
4358
4359        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4360        let client = SimpleAgentsClientBuilder::new()
4361            .with_provider(Arc::new(ToolLoopProvider))
4362            .build()
4363            .expect("client should build");
4364
4365        let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4366            &workflow,
4367            &json!({"email_text": "hello"}),
4368            &client,
4369            None,
4370            None,
4371            &YamlWorkflowRunOptions::default(),
4372        )
4373        .await
4374        .expect_err("workflow should fail without custom worker executor");
4375
4376        match error {
4377            YamlWorkflowRunError::CustomWorker { node_id, message } => {
4378                assert_eq!(node_id, "enrich");
4379                assert!(message.contains("requires a configured custom worker executor"));
4380            }
4381            other => panic!("expected custom worker error, got {other:?}"),
4382        }
4383    }
4384
4385    #[tokio::test]
4386    async fn custom_worker_receives_trace_context_block() {
4387        let yaml = r#"
4388id: custom-worker-trace-context
4389entry_node: lookup
4390nodes:
4391  - id: lookup
4392    node_type:
4393      custom_worker:
4394        handler: GetRagData
4395    config:
4396      payload:
4397        topic: demo
4398"#;
4399
4400        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4401        let worker = CapturingWorker {
4402            context: Mutex::new(None),
4403        };
4404        let options = YamlWorkflowRunOptions {
4405            trace: YamlWorkflowTraceOptions {
4406                context: Some(YamlWorkflowTraceContextInput {
4407                    trace_id: Some("trace-fixed-ctx".to_string()),
4408                    traceparent: Some("00-trace-fixed-ctx-span-fixed-01".to_string()),
4409                    ..YamlWorkflowTraceContextInput::default()
4410                }),
4411                tenant: YamlWorkflowTraceTenantContext {
4412                    conversation_id: Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5".to_string()),
4413                    request_id: Some("turn-7".to_string()),
4414                    ..YamlWorkflowTraceTenantContext::default()
4415                },
4416            },
4417            ..YamlWorkflowRunOptions::default()
4418        };
4419
4420        run_workflow_yaml_with_custom_worker_and_events_and_options(
4421            &workflow,
4422            &json!({"email_text":"hello"}),
4423            &MockExecutor,
4424            Some(&worker),
4425            None,
4426            &options,
4427        )
4428        .await
4429        .expect("workflow should execute");
4430
4431        let captured_context = worker
4432            .context
4433            .lock()
4434            .expect("capturing worker lock should not be poisoned")
4435            .clone()
4436            .expect("custom worker should receive context");
4437
4438        assert_eq!(
4439            captured_context
4440                .get("trace")
4441                .and_then(|trace| trace.get("context"))
4442                .and_then(|context| context.get("trace_id"))
4443                .and_then(Value::as_str),
4444            Some("trace-fixed-ctx")
4445        );
4446        assert_eq!(
4447            captured_context
4448                .get("trace")
4449                .and_then(|trace| trace.get("context"))
4450                .and_then(|context| context.get("traceparent"))
4451                .and_then(Value::as_str),
4452            Some("00-trace-fixed-ctx-span-fixed-01")
4453        );
4454        assert_eq!(
4455            captured_context
4456                .get("trace")
4457                .and_then(|trace| trace.get("tenant"))
4458                .and_then(|tenant| tenant.get("conversation_id"))
4459                .and_then(Value::as_str),
4460            Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5")
4461        );
4462    }
4463
4464    #[tokio::test]
4465    async fn event_sink_cancellation_stops_workflow_before_llm_execution() {
4466        let yaml = r#"
4467id: cancellation-test
4468entry_node: classify
4469nodes:
4470  - id: classify
4471    node_type:
4472      llm_call:
4473        model: gpt-4.1
4474    config:
4475      prompt: |
4476        Classify this email into exactly one category:
4477        {{ input.email_text }}
4478"#;
4479
4480        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4481        let executor = CountingExecutor {
4482            call_count: AtomicUsize::new(0),
4483        };
4484        let sink = CancelAfterFirstEventSink {
4485            cancelled: AtomicBool::new(false),
4486        };
4487
4488        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
4489            &workflow,
4490            &json!({"email_text":"hello"}),
4491            &executor,
4492            None,
4493            Some(&sink),
4494            &YamlWorkflowRunOptions::default(),
4495        )
4496        .await
4497        .expect_err("workflow should stop when sink signals cancellation");
4498
4499        assert!(matches!(
4500            err,
4501            YamlWorkflowRunError::EventSinkCancelled { .. }
4502        ));
4503        assert_eq!(executor.call_count.load(Ordering::SeqCst), 0);
4504    }
4505
4506    #[tokio::test]
4507    async fn rejects_invalid_messages_path_shape() {
4508        let yaml = r#"
4509id: email-intake-classification
4510entry_node: classify_top_level
4511nodes:
4512  - id: classify_top_level
4513    node_type:
4514      llm_call:
4515        model: gpt-4.1
4516        messages_path: input.messages
4517"#;
4518
4519        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4520        let input = json!({
4521            "email_text": "ignored",
4522            "messages": "not-a-list"
4523        });
4524
4525        let err = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
4526            .await
4527            .expect_err("workflow should fail for invalid messages shape");
4528
4529        assert!(matches!(err, YamlWorkflowRunError::Llm { .. }));
4530    }
4531
4532    #[test]
4533    fn renders_yaml_workflow_to_mermaid_with_switch_labels() {
4534        let yaml = r#"
4535id: chat-workflow
4536entry_node: decide
4537nodes:
4538  - id: decide
4539    node_type:
4540      switch:
4541        branches:
4542          - condition: '$.input.mode == "draft"'
4543            target: draft
4544        default: ask
4545  - id: draft
4546    node_type:
4547      llm_call:
4548        model: gpt-4.1
4549  - id: ask
4550    node_type:
4551      llm_call:
4552        model: gpt-4.1
4553edges:
4554  - from: draft
4555    to: ask
4556"#;
4557
4558        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4559        let mermaid = yaml_workflow_to_mermaid(&workflow);
4560
4561        assert!(mermaid.contains("flowchart TD"));
4562        assert!(mermaid.contains("decide -- \"route1\" --> draft"));
4563        assert!(mermaid.contains("decide -- \"default\" --> ask"));
4564        assert!(mermaid.contains("draft --> ask"));
4565    }
4566
4567    #[test]
4568    fn renders_yaml_workflow_tools_as_colored_tool_nodes() {
4569        let yaml = r#"
4570id: tool-graph
4571entry_node: chat
4572nodes:
4573  - id: chat
4574    node_type:
4575      llm_call:
4576        model: gemini-3-flash
4577        tools_format: simplified
4578        tools:
4579          - name: run_workflow_graph
4580            input_schema:
4581              type: object
4582              properties:
4583                workflow_id: { type: string }
4584              required: [workflow_id]
4585edges: []
4586"#;
4587
4588        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4589        let mermaid = yaml_workflow_to_mermaid(&workflow);
4590
4591        assert!(mermaid.contains("chat__tool_0"));
4592        assert!(mermaid.contains("chat -.-> chat__tool_0"));
4593        assert!(mermaid.contains("classDef toolNode"));
4594        assert!(mermaid.contains("class chat__tool_0 toolNode;"));
4595    }
4596
4597    #[test]
4598    fn renders_yaml_workflow_file_to_mermaid_with_subgraph_cluster_when_present() {
4599        let base_dir = std::env::temp_dir().join(format!(
4600            "simple_agents_mermaid_subgraph_{}",
4601            std::time::SystemTime::now()
4602                .duration_since(std::time::UNIX_EPOCH)
4603                .expect("unix epoch")
4604                .as_nanos()
4605        ));
4606        fs::create_dir_all(&base_dir).expect("temp dir should be created");
4607
4608        let orchestrator_path = base_dir.join("email-chat-orchestrator-with-subgraph-tool.yaml");
4609        let subgraph_path = base_dir.join("hr-warning-email-subgraph.yaml");
4610
4611        let orchestrator_yaml = r#"
4612id: email-chat-orchestrator-with-subgraph-tool
4613entry_node: respond_casual
4614nodes:
4615  - id: respond_casual
4616    node_type:
4617      llm_call:
4618        model: gemini-3-flash
4619        tools_format: simplified
4620        tools:
4621          - name: run_workflow_graph
4622            input_schema:
4623              type: object
4624              properties:
4625                workflow_id: { type: string }
4626              required: [workflow_id]
4627    config:
4628      prompt: |
4629        Call with:
4630        {
4631          "workflow_id": "hr_warning_email_subgraph"
4632        }
4633edges: []
4634"#;
4635
4636        let subgraph_yaml = r#"
4637id: hr-warning-email-subgraph
4638entry_node: draft_hr_warning_email
4639nodes:
4640  - id: draft_hr_warning_email
4641    node_type:
4642      llm_call:
4643        model: gemini-3-flash
4644edges: []
4645"#;
4646
4647        fs::write(&orchestrator_path, orchestrator_yaml).expect("orchestrator yaml written");
4648        fs::write(&subgraph_path, subgraph_yaml).expect("subgraph yaml written");
4649
4650        let mermaid =
4651            yaml_workflow_file_to_mermaid(&orchestrator_path).expect("mermaid should render");
4652
4653        assert!(mermaid.contains("Main: email-chat-orchestrator-with-subgraph-tool"));
4654        assert!(mermaid.contains("Subgraph: hr_warning_email_subgraph"));
4655        assert!(mermaid.contains("calls hr_warning_email_subgraph"));
4656        assert!(mermaid.contains("subgraph_1__draft_hr_warning_email"));
4657
4658        fs::remove_dir_all(base_dir).expect("temp dir removed");
4659    }
4660
4661    #[tokio::test]
4662    async fn rejects_non_yaml_workflow_file_extension() {
4663        let file_path = std::env::temp_dir().join(format!(
4664            "simple_agents_workflow_{}.txt",
4665            std::time::SystemTime::now()
4666                .duration_since(std::time::UNIX_EPOCH)
4667                .expect("unix epoch")
4668                .as_nanos()
4669        ));
4670        fs::write(&file_path, "not yaml").expect("temporary file should be created");
4671
4672        let result =
4673            run_workflow_yaml_file(&file_path, &json!({"email_text": "hello"}), &MockExecutor)
4674                .await;
4675
4676        match result {
4677            Err(YamlWorkflowRunError::FileRejected { path, reason }) => {
4678                assert!(path.ends_with(".txt"));
4679                assert!(reason.contains(".yaml or .yml"));
4680            }
4681            other => panic!("expected file rejection error, got {other:?}"),
4682        }
4683
4684        let _ = fs::remove_file(&file_path);
4685    }
4686
4687    #[test]
4688    fn converts_yaml_workflow_to_ir_definition() {
4689        let yaml = r#"
4690id: chat-workflow
4691entry_node: classify
4692nodes:
4693  - id: classify
4694    node_type:
4695      llm_call:
4696        model: gpt-4.1
4697    config:
4698      prompt: |
4699        classify
4700  - id: route
4701    node_type:
4702      switch:
4703        branches:
4704          - condition: '$.nodes.classify.output.kind == "x"'
4705            target: done
4706        default: done
4707  - id: done
4708    node_type:
4709      custom_worker:
4710        handler: GetRagData
4711    config:
4712      payload:
4713        topic: test
4714edges:
4715  - from: classify
4716    to: route
4717"#;
4718
4719        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4720        let ir = yaml_workflow_to_ir(&workflow).expect("yaml should convert to ir");
4721
4722        assert_eq!(ir.name, "chat-workflow");
4723        assert!(ir.nodes.iter().any(|n| n.id == "__yaml_start"));
4724        assert!(ir.nodes.iter().any(|n| n.id == "classify"));
4725        assert!(ir.nodes.iter().any(|n| n.id == "route"));
4726        assert!(ir.nodes.iter().any(|n| n.id == "done"));
4727    }
4728
4729    #[test]
4730    fn supports_yaml_to_ir_when_messages_path_is_used() {
4731        let yaml = r#"
4732id: chat-workflow
4733entry_node: classify
4734nodes:
4735  - id: classify
4736    node_type:
4737      llm_call:
4738        model: gpt-4.1
4739        messages_path: input.messages
4740"#;
4741
4742        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4743        let ir =
4744            yaml_workflow_to_ir(&workflow).expect("messages_path should convert to tool-based IR");
4745        assert!(ir.nodes.iter().any(|node| matches!(
4746            node.kind,
4747            crate::ir::NodeKind::Tool { ref tool, .. } if tool == "__yaml_llm_call"
4748        )));
4749    }
4750
4751    #[tokio::test]
4752    async fn workflow_output_contains_trace_id_in_both_locations() {
4753        let yaml = r#"
4754id: trace-test
4755entry_node: classify
4756nodes:
4757  - id: classify
4758    node_type:
4759      llm_call:
4760        model: gpt-4.1
4761    config:
4762      prompt: |
4763        Classify this email into exactly one category:
4764        {{ input.email_text }}
4765"#;
4766
4767        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4768        let output = run_workflow_yaml(&workflow, &json!({"email_text":"hello"}), &MockExecutor)
4769            .await
4770            .expect("workflow should execute");
4771
4772        let trace_id = output
4773            .trace_id
4774            .as_deref()
4775            .expect("trace_id should be present");
4776        assert!(!trace_id.is_empty());
4777        assert_eq!(
4778            output.metadata.as_ref().and_then(|value| {
4779                value
4780                    .get("telemetry")
4781                    .and_then(|telemetry| telemetry.get("trace_id"))
4782                    .and_then(Value::as_str)
4783            }),
4784            Some(trace_id)
4785        );
4786    }
4787
4788    #[tokio::test]
4789    async fn workflow_run_options_sample_rate_zero_marks_trace_unsampled() {
4790        let yaml = r#"
4791id: sample-rate-zero
4792entry_node: classify
4793nodes:
4794  - id: classify
4795    node_type:
4796      llm_call:
4797        model: gpt-4.1
4798    config:
4799      prompt: |
4800        Classify this email into exactly one category:
4801        {{ input.email_text }}
4802"#;
4803
4804        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4805        let options = YamlWorkflowRunOptions {
4806            telemetry: YamlWorkflowTelemetryConfig {
4807                sample_rate: 0.0,
4808                ..YamlWorkflowTelemetryConfig::default()
4809            },
4810            trace: YamlWorkflowTraceOptions {
4811                context: Some(YamlWorkflowTraceContextInput {
4812                    trace_id: Some("trace-sample-zero".to_string()),
4813                    ..YamlWorkflowTraceContextInput::default()
4814                }),
4815                tenant: YamlWorkflowTraceTenantContext::default(),
4816            },
4817            model: None,
4818        };
4819
4820        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
4821            &workflow,
4822            &json!({"email_text":"hello"}),
4823            &MockExecutor,
4824            None,
4825            None,
4826            &options,
4827        )
4828        .await
4829        .expect("workflow should execute");
4830
4831        assert_eq!(output.trace_id.as_deref(), Some("trace-sample-zero"));
4832        assert_eq!(
4833            output
4834                .metadata
4835                .as_ref()
4836                .and_then(|value| value.get("telemetry"))
4837                .and_then(|telemetry| telemetry.get("sampled"))
4838                .and_then(Value::as_bool),
4839            Some(false)
4840        );
4841    }
4842
4843    #[test]
4844    fn trace_id_from_traceparent_parses_w3c_header() {
4845        let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
4846        assert_eq!(
4847            trace_id_from_traceparent(traceparent).as_deref(),
4848            Some("4bf92f3577b34da6a3ce929d0e0e4736")
4849        );
4850    }
4851
4852    #[test]
4853    fn resolve_telemetry_context_marks_traceparent_source() {
4854        let mut options = YamlWorkflowRunOptions::default();
4855        options.trace.context = Some(YamlWorkflowTraceContextInput {
4856            traceparent: Some(
4857                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
4858            ),
4859            ..YamlWorkflowTraceContextInput::default()
4860        });
4861
4862        let context = resolve_telemetry_context(&options, None);
4863
4864        assert_eq!(context.trace_id_source, TraceIdSource::Traceparent);
4865        assert_eq!(
4866            context.trace_id.as_deref(),
4867            Some("4bf92f3577b34da6a3ce929d0e0e4736")
4868        );
4869    }
4870
4871    #[tokio::test]
4872    async fn workflow_run_options_derives_trace_id_from_traceparent_when_trace_id_missing() {
4873        let yaml = r#"
4874id: traceparent-derived
4875entry_node: classify
4876nodes:
4877  - id: classify
4878    node_type:
4879      llm_call:
4880        model: gpt-4.1
4881    config:
4882      prompt: |
4883        Classify this email into exactly one category:
4884        {{ input.email_text }}
4885"#;
4886
4887        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4888        let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
4889        let options = YamlWorkflowRunOptions {
4890            telemetry: YamlWorkflowTelemetryConfig {
4891                sample_rate: 0.0,
4892                ..YamlWorkflowTelemetryConfig::default()
4893            },
4894            trace: YamlWorkflowTraceOptions {
4895                context: Some(YamlWorkflowTraceContextInput {
4896                    traceparent: Some(traceparent.to_string()),
4897                    ..YamlWorkflowTraceContextInput::default()
4898                }),
4899                tenant: YamlWorkflowTraceTenantContext::default(),
4900            },
4901            model: None,
4902        };
4903
4904        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
4905            &workflow,
4906            &json!({"email_text":"hello"}),
4907            &MockExecutor,
4908            None,
4909            None,
4910            &options,
4911        )
4912        .await
4913        .expect("workflow should execute");
4914
4915        assert_eq!(
4916            output.trace_id.as_deref(),
4917            Some("4bf92f3577b34da6a3ce929d0e0e4736")
4918        );
4919        assert_eq!(
4920            output
4921                .metadata
4922                .as_ref()
4923                .and_then(|value| value.get("telemetry"))
4924                .and_then(|telemetry| telemetry.get("sampled"))
4925                .and_then(Value::as_bool),
4926            Some(false)
4927        );
4928    }
4929
4930    #[tokio::test]
4931    async fn workflow_run_options_sample_rate_one_marks_trace_sampled() {
4932        let yaml = r#"
4933id: sample-rate-one
4934entry_node: classify
4935nodes:
4936  - id: classify
4937    node_type:
4938      llm_call:
4939        model: gpt-4.1
4940    config:
4941      prompt: |
4942        Classify this email into exactly one category:
4943        {{ input.email_text }}
4944"#;
4945
4946        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4947        let options = YamlWorkflowRunOptions {
4948            telemetry: YamlWorkflowTelemetryConfig {
4949                sample_rate: 1.0,
4950                ..YamlWorkflowTelemetryConfig::default()
4951            },
4952            trace: YamlWorkflowTraceOptions {
4953                context: Some(YamlWorkflowTraceContextInput {
4954                    trace_id: Some("trace-sample-one".to_string()),
4955                    ..YamlWorkflowTraceContextInput::default()
4956                }),
4957                tenant: YamlWorkflowTraceTenantContext::default(),
4958            },
4959            model: None,
4960        };
4961
4962        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
4963            &workflow,
4964            &json!({"email_text":"hello"}),
4965            &MockExecutor,
4966            None,
4967            None,
4968            &options,
4969        )
4970        .await
4971        .expect("workflow should execute");
4972
4973        assert_eq!(output.trace_id.as_deref(), Some("trace-sample-one"));
4974        assert_eq!(
4975            output
4976                .metadata
4977                .as_ref()
4978                .and_then(|value| value.get("telemetry"))
4979                .and_then(|telemetry| telemetry.get("sampled"))
4980                .and_then(Value::as_bool),
4981            Some(true)
4982        );
4983    }
4984
4985    #[tokio::test]
4986    async fn workflow_run_options_sampling_is_deterministic_for_fixed_trace_id() {
4987        let yaml = r#"
4988id: sample-rate-deterministic
4989entry_node: classify
4990nodes:
4991  - id: classify
4992    node_type:
4993      llm_call:
4994        model: gpt-4.1
4995    config:
4996      prompt: |
4997        Classify this email into exactly one category:
4998        {{ input.email_text }}
4999"#;
5000
5001        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5002        let options = YamlWorkflowRunOptions {
5003            telemetry: YamlWorkflowTelemetryConfig {
5004                sample_rate: 0.5,
5005                ..YamlWorkflowTelemetryConfig::default()
5006            },
5007            trace: YamlWorkflowTraceOptions {
5008                context: Some(YamlWorkflowTraceContextInput {
5009                    trace_id: Some("trace-sample-deterministic".to_string()),
5010                    ..YamlWorkflowTraceContextInput::default()
5011                }),
5012                tenant: YamlWorkflowTraceTenantContext::default(),
5013            },
5014            model: None,
5015        };
5016
5017        let mut sampled_values = Vec::new();
5018        for _ in 0..3 {
5019            let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5020                &workflow,
5021                &json!({"email_text":"hello"}),
5022                &MockExecutor,
5023                None,
5024                None,
5025                &options,
5026            )
5027            .await
5028            .expect("workflow should execute");
5029
5030            let sampled = output
5031                .metadata
5032                .as_ref()
5033                .and_then(|value| value.get("telemetry"))
5034                .and_then(|telemetry| telemetry.get("sampled"))
5035                .and_then(Value::as_bool)
5036                .expect("sampled flag should be present");
5037            sampled_values.push(sampled);
5038        }
5039
5040        assert!(sampled_values
5041            .iter()
5042            .all(|value| *value == sampled_values[0]));
5043    }
5044
5045    #[tokio::test]
5046    async fn workflow_run_options_reject_invalid_sample_rate() {
5047        let yaml = r#"
5048id: sample-rate-invalid
5049entry_node: classify
5050nodes:
5051  - id: classify
5052    node_type:
5053      llm_call:
5054        model: gpt-4.1
5055    config:
5056      prompt: |
5057        Classify this email into exactly one category:
5058        {{ input.email_text }}
5059"#;
5060
5061        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5062        let options = YamlWorkflowRunOptions {
5063            telemetry: YamlWorkflowTelemetryConfig {
5064                sample_rate: 1.1,
5065                ..YamlWorkflowTelemetryConfig::default()
5066            },
5067            ..YamlWorkflowRunOptions::default()
5068        };
5069
5070        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
5071            &workflow,
5072            &json!({"email_text":"hello"}),
5073            &MockExecutor,
5074            None,
5075            None,
5076            &options,
5077        )
5078        .await
5079        .expect_err("invalid sample_rate should fail");
5080
5081        match err {
5082            YamlWorkflowRunError::InvalidInput { message } => {
5083                assert!(message.contains("telemetry.sample_rate"));
5084            }
5085            _ => panic!("expected invalid input error for sample_rate"),
5086        }
5087    }
5088
5089    #[tokio::test]
5090    async fn workflow_run_options_reject_nan_sample_rate() {
5091        let yaml = r#"
5092id: sample-rate-invalid
5093entry_node: classify
5094nodes:
5095  - id: classify
5096    node_type:
5097      llm_call:
5098        model: gpt-4.1
5099    config:
5100      prompt: |
5101        Classify this email into exactly one category:
5102        {{ input.email_text }}
5103"#;
5104
5105        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5106        let options = YamlWorkflowRunOptions {
5107            telemetry: YamlWorkflowTelemetryConfig {
5108                sample_rate: f32::NAN,
5109                ..YamlWorkflowTelemetryConfig::default()
5110            },
5111            ..YamlWorkflowRunOptions::default()
5112        };
5113
5114        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
5115            &workflow,
5116            &json!({"email_text":"hello"}),
5117            &MockExecutor,
5118            None,
5119            None,
5120            &options,
5121        )
5122        .await
5123        .expect_err("nan sample_rate should fail");
5124
5125        assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
5126    }
5127
5128    #[test]
5129    fn subworkflow_options_inherit_parent_telemetry_configuration() {
5130        let mut parent_options = YamlWorkflowRunOptions::default();
5131        parent_options.telemetry.sample_rate = 0.0;
5132        parent_options.telemetry.payload_mode = YamlWorkflowPayloadMode::RedactedPayload;
5133        parent_options.telemetry.tool_trace_mode = YamlToolTraceMode::Redacted;
5134        parent_options.trace.tenant.conversation_id = Some("conv-123".to_string());
5135
5136        let parent_context = TraceContext {
5137            trace_id: Some("trace-parent".to_string()),
5138            span_id: Some("span-parent".to_string()),
5139            parent_span_id: None,
5140            traceparent: Some("00-trace-parent-span-parent-01".to_string()),
5141            tracestate: None,
5142            baggage: BTreeMap::new(),
5143        };
5144
5145        let subworkflow_options =
5146            build_subworkflow_options(&parent_options, Some(&parent_context), None);
5147
5148        assert_eq!(subworkflow_options.telemetry.sample_rate, 0.0);
5149        assert_eq!(
5150            subworkflow_options.telemetry.payload_mode,
5151            YamlWorkflowPayloadMode::RedactedPayload
5152        );
5153        assert_eq!(
5154            subworkflow_options.telemetry.tool_trace_mode,
5155            YamlToolTraceMode::Redacted
5156        );
5157        assert_eq!(
5158            subworkflow_options.trace.tenant.conversation_id.as_deref(),
5159            Some("conv-123")
5160        );
5161        assert_eq!(
5162            subworkflow_options
5163                .trace
5164                .context
5165                .as_ref()
5166                .and_then(|ctx| ctx.trace_id.as_deref()),
5167            Some("trace-parent")
5168        );
5169    }
5170
5171    #[test]
5172    fn trace_tenant_attributes_include_langfuse_aliases() {
5173        let tenant = YamlWorkflowTraceTenantContext {
5174            workspace_id: Some("ws-1".to_string()),
5175            user_id: Some("user-1".to_string()),
5176            conversation_id: Some("conv-1".to_string()),
5177            request_id: Some("req-1".to_string()),
5178            run_id: Some("run-1".to_string()),
5179        };
5180        let mut span = CapturingSpan::default();
5181        apply_trace_tenant_attributes_from_tenant(&mut span, &tenant);
5182
5183        assert_eq!(
5184            span.attributes
5185                .get("tenant.workspace_id")
5186                .map(String::as_str),
5187            Some("ws-1")
5188        );
5189        assert_eq!(
5190            span.attributes.get("tenant.user_id").map(String::as_str),
5191            Some("user-1")
5192        );
5193        assert_eq!(
5194            span.attributes
5195                .get("tenant.conversation_id")
5196                .map(String::as_str),
5197            Some("conv-1")
5198        );
5199        assert_eq!(
5200            span.attributes.get("langfuse.user.id").map(String::as_str),
5201            Some("user-1")
5202        );
5203        assert_eq!(
5204            span.attributes
5205                .get("langfuse.session.id")
5206                .map(String::as_str),
5207            Some("conv-1")
5208        );
5209    }
5210
5211    #[test]
5212    fn langfuse_nerdstats_attributes_are_written_when_enabled() {
5213        let output = YamlWorkflowRunOutput {
5214            workflow_id: "wf-1".to_string(),
5215            entry_node: "start".to_string(),
5216            email_text: "email".to_string(),
5217            trace: vec!["node-1".to_string()],
5218            outputs: BTreeMap::new(),
5219            terminal_node: "node-1".to_string(),
5220            terminal_output: None,
5221            step_timings: vec![YamlStepTiming {
5222                node_id: "node-1".to_string(),
5223                node_kind: "llm_call".to_string(),
5224                model_name: Some("gpt-4.1-mini".to_string()),
5225                elapsed_ms: 42,
5226                prompt_tokens: Some(4),
5227                completion_tokens: Some(6),
5228                total_tokens: Some(10),
5229                reasoning_tokens: Some(2),
5230                tokens_per_second: Some(14.0),
5231            }],
5232            llm_node_metrics: BTreeMap::new(),
5233            llm_node_models: BTreeMap::new(),
5234            total_elapsed_ms: 42,
5235            ttft_ms: Some(7),
5236            total_input_tokens: 4,
5237            total_output_tokens: 6,
5238            total_tokens: 10,
5239            total_reasoning_tokens: Some(2),
5240            tokens_per_second: 14.0,
5241            trace_id: Some("trace-1".to_string()),
5242            metadata: None,
5243        };
5244
5245        let mut span = CapturingSpan::default();
5246        apply_langfuse_nerdstats_attributes(&mut span, &output, true);
5247
5248        assert_eq!(
5249            span.attributes
5250                .get("langfuse.trace.metadata.nerdstats.workflow_id")
5251                .map(String::as_str),
5252            Some("wf-1")
5253        );
5254        assert_eq!(
5255            span.attributes
5256                .get("langfuse.trace.metadata.nerdstats.total_tokens")
5257                .map(String::as_str),
5258            Some("10")
5259        );
5260        assert_eq!(
5261            span.attributes
5262                .get("langfuse.trace.metadata.nerdstats.token_metrics_available")
5263                .map(String::as_str),
5264            Some("true")
5265        );
5266        assert!(span
5267            .attributes
5268            .contains_key("langfuse.trace.metadata.nerdstats"));
5269    }
5270
5271    #[test]
5272    fn langfuse_trace_input_output_and_usage_are_written() {
5273        let output = YamlWorkflowRunOutput {
5274            workflow_id: "wf-1".to_string(),
5275            entry_node: "start".to_string(),
5276            email_text: "email".to_string(),
5277            trace: vec!["node-1".to_string()],
5278            outputs: BTreeMap::new(),
5279            terminal_node: "node-1".to_string(),
5280            terminal_output: Some(json!({"final":"ok"})),
5281            step_timings: Vec::new(),
5282            llm_node_metrics: BTreeMap::new(),
5283            llm_node_models: BTreeMap::new(),
5284            total_elapsed_ms: 10,
5285            ttft_ms: None,
5286            total_input_tokens: 11,
5287            total_output_tokens: 22,
5288            total_tokens: 33,
5289            total_reasoning_tokens: Some(4),
5290            tokens_per_second: 1.5,
5291            trace_id: Some("trace-1".to_string()),
5292            metadata: None,
5293        };
5294
5295        let mut span = CapturingSpan::default();
5296        let input = json!({"email_text":"hello"});
5297        apply_langfuse_trace_input_output_attributes(
5298            &mut span,
5299            &input,
5300            &output,
5301            YamlWorkflowPayloadMode::FullPayload,
5302        );
5303
5304        assert_eq!(
5305            span.attributes
5306                .get("langfuse.trace.input")
5307                .map(String::as_str),
5308            Some("{\"email_text\":\"hello\"}")
5309        );
5310        assert_eq!(
5311            span.attributes
5312                .get("langfuse.trace.output")
5313                .map(String::as_str),
5314            Some("{\"final\":\"ok\"}")
5315        );
5316        assert!(span
5317            .attributes
5318            .contains_key("langfuse.trace.metadata.usage_details"));
5319    }
5320
5321    #[test]
5322    fn langfuse_observation_usage_attributes_are_written() {
5323        let usage = YamlLlmTokenUsage {
5324            prompt_tokens: 7,
5325            completion_tokens: 9,
5326            total_tokens: 16,
5327            reasoning_tokens: Some(3),
5328        };
5329        let mut span = CapturingSpan::default();
5330        apply_langfuse_observation_usage_attributes(&mut span, &usage);
5331
5332        assert_eq!(
5333            span.attributes
5334                .get("gen_ai.usage.input_tokens")
5335                .map(String::as_str),
5336            Some("7")
5337        );
5338        assert_eq!(
5339            span.attributes
5340                .get("gen_ai.usage.output_tokens")
5341                .map(String::as_str),
5342            Some("9")
5343        );
5344        assert_eq!(
5345            span.attributes
5346                .get("gen_ai.usage.total_tokens")
5347                .map(String::as_str),
5348            Some("16")
5349        );
5350        assert_eq!(
5351            span.attributes
5352                .get("gen_ai.usage.reasoning_tokens")
5353                .map(String::as_str),
5354            Some("3")
5355        );
5356        assert!(span
5357            .attributes
5358            .contains_key("langfuse.observation.usage_details"));
5359    }
5360
5361    #[tokio::test]
5362    async fn workflow_run_options_use_explicit_trace_id_and_payload_mode() {
5363        let yaml = r#"
5364id: trace-options-test
5365entry_node: classify
5366nodes:
5367  - id: classify
5368    node_type:
5369      llm_call:
5370        model: gpt-4.1
5371    config:
5372      prompt: |
5373        Classify this email into exactly one category:
5374        {{ input.email_text }}
5375"#;
5376
5377        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5378        let options = YamlWorkflowRunOptions {
5379            telemetry: YamlWorkflowTelemetryConfig {
5380                payload_mode: YamlWorkflowPayloadMode::RedactedPayload,
5381                ..YamlWorkflowTelemetryConfig::default()
5382            },
5383            trace: YamlWorkflowTraceOptions {
5384                context: Some(YamlWorkflowTraceContextInput {
5385                    trace_id: Some("trace-fixed-123".to_string()),
5386                    traceparent: Some("00-trace-fixed-123-span-1-01".to_string()),
5387                    ..YamlWorkflowTraceContextInput::default()
5388                }),
5389                tenant: YamlWorkflowTraceTenantContext {
5390                    conversation_id: Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42".to_string()),
5391                    ..YamlWorkflowTraceTenantContext::default()
5392                },
5393            },
5394            model: None,
5395        };
5396
5397        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5398            &workflow,
5399            &json!({"email_text":"hello"}),
5400            &MockExecutor,
5401            None,
5402            None,
5403            &options,
5404        )
5405        .await
5406        .expect("workflow should execute");
5407
5408        assert_eq!(output.trace_id.as_deref(), Some("trace-fixed-123"));
5409        assert_eq!(
5410            output
5411                .metadata
5412                .as_ref()
5413                .and_then(|value| value.get("telemetry"))
5414                .and_then(|telemetry| telemetry.get("payload_mode"))
5415                .and_then(Value::as_str),
5416            Some("redacted_payload")
5417        );
5418        assert_eq!(
5419            output
5420                .metadata
5421                .as_ref()
5422                .and_then(|value| value.get("trace"))
5423                .and_then(|trace| trace.get("tenant"))
5424                .and_then(|tenant| tenant.get("conversation_id"))
5425                .and_then(Value::as_str),
5426            Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42")
5427        );
5428    }
5429
5430    #[test]
5431    fn streamed_payload_parser_extracts_last_json_object() {
5432        let raw = r#"{"state":"missing_scenario","reason":"ok"}
5433
5434extra reasoning text
5435
5436{"state":"ready","reason":"final"}"#;
5437
5438        let resolved = parse_streamed_structured_payload(raw, false)
5439            .expect("parser should extract final JSON object");
5440        assert_eq!(resolved.payload["state"], "ready");
5441        assert!(resolved.heal_confidence.is_none());
5442    }
5443
5444    #[test]
5445    fn streamed_payload_parser_handles_unbalanced_reasoning_before_json() {
5446        let raw = "reasoning text with unmatched { braces and thoughts\n{\"state\":\"ready\",\"reason\":\"final\"}";
5447
5448        let resolved = parse_streamed_structured_payload(raw, false)
5449            .expect("parser should recover final structured JSON object");
5450        assert_eq!(resolved.payload["state"], "ready");
5451    }
5452
5453    #[test]
5454    fn streamed_payload_parser_handles_markdown_with_heal() {
5455        let raw = r#"Some preface
5456```json
5457{
5458  "state": "missing_scenario",
5459  "reason": "Need more details"
5460}
5461```
5462Some trailing explanation"#;
5463
5464        let resolved = parse_streamed_structured_payload(raw, true)
5465            .expect("heal path should parse JSON block");
5466        assert_eq!(resolved.payload["state"], "missing_scenario");
5467        assert!(resolved.heal_confidence.is_some());
5468    }
5469
5470    #[test]
5471    fn streamed_payload_parser_errors_when_no_json_candidate_exists() {
5472        let raw = "No JSON in this streamed output";
5473        let error = parse_streamed_structured_payload(raw, false)
5474            .expect_err("strict stream parse should fail without JSON candidate");
5475        assert!(error.contains("no JSON object candidate found"));
5476    }
5477
5478    #[test]
5479    fn include_raw_stream_debug_events_defaults_to_false() {
5480        let _guard = stream_debug_env_lock()
5481            .lock()
5482            .expect("stream debug env lock should not be poisoned");
5483        std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
5484        assert!(!include_raw_stream_debug_events());
5485    }
5486
5487    #[test]
5488    fn include_raw_stream_debug_events_accepts_truthy_values() {
5489        let _guard = stream_debug_env_lock()
5490            .lock()
5491            .expect("stream debug env lock should not be poisoned");
5492        std::env::set_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW", "true");
5493        assert!(include_raw_stream_debug_events());
5494        std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
5495    }
5496
5497    #[test]
5498    fn structured_json_delta_filter_strips_reasoning_prefix_and_suffix() {
5499        let mut filter = StructuredJsonDeltaFilter::default();
5500        let chunks = vec![
5501            "I will think first... ",
5502            "{\"state\":\"missing_scenario\",",
5503            "\"reason\":\"Need more details\"}",
5504            " additional commentary",
5505        ];
5506
5507        let filtered = chunks
5508            .into_iter()
5509            .filter_map(|chunk| filter.split(chunk).0)
5510            .collect::<String>();
5511
5512        assert_eq!(
5513            filtered,
5514            "{\"state\":\"missing_scenario\",\"reason\":\"Need more details\"}"
5515        );
5516    }
5517
5518    #[test]
5519    fn structured_json_delta_filter_handles_braces_inside_strings() {
5520        let mut filter = StructuredJsonDeltaFilter::default();
5521        let chunks = vec![
5522            "preface ",
5523            "{\"reason\":\"brace } in text\",\"state\":\"ok\"}",
5524            " trailing",
5525        ];
5526
5527        let filtered = chunks
5528            .into_iter()
5529            .filter_map(|chunk| filter.split(chunk).0)
5530            .collect::<String>();
5531
5532        assert_eq!(
5533            filtered,
5534            "{\"reason\":\"brace } in text\",\"state\":\"ok\"}"
5535        );
5536    }
5537
5538    #[test]
5539    fn render_json_object_as_text_converts_top_level_fields() {
5540        let rendered =
5541            render_json_object_as_text(r#"{"question":"q","confidence":0.8,"nested":{"a":1}}"#);
5542        let lines: std::collections::HashSet<&str> = rendered.lines().collect();
5543
5544        assert_eq!(lines.len(), 3);
5545        assert!(lines.contains("question: q"));
5546        assert!(lines.contains("confidence: 0.8"));
5547        assert!(lines.contains("nested: {\"a\":1}"));
5548    }
5549
5550    #[test]
5551    fn stream_json_as_text_formatter_emits_once_when_complete() {
5552        let mut formatter = StreamJsonAsTextFormatter::default();
5553        formatter.push("{\"question\":\"hello\"}");
5554
5555        let first = formatter.emit_if_ready(true);
5556        let second = formatter.emit_if_ready(true);
5557
5558        assert_eq!(first, Some("question: hello".to_string()));
5559        assert_eq!(second, None);
5560    }
5561
5562    #[test]
5563    fn rewrite_yaml_condition_preserves_output_prefix_in_field_names() {
5564        let expr = "$.nodes.classify.output.output_total == 1";
5565        let rewritten = rewrite_yaml_condition_to_ir(expr);
5566        assert_eq!(rewritten, "$.node_outputs.classify.output_total == 1");
5567    }
5568
5569    #[tokio::test]
5570    async fn validates_workflow_input_before_ir_runtime_path() {
5571        let yaml = r#"
5572id: chat-workflow
5573entry_node: classify
5574nodes:
5575  - id: classify
5576    node_type:
5577      llm_call:
5578        model: gpt-4.1
5579    config:
5580      prompt: |
5581        classify
5582"#;
5583
5584        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5585        let err = run_workflow_yaml(&workflow, &json!("not-an-object"), &MockExecutor)
5586            .await
5587            .expect_err("non-object input should fail before execution");
5588
5589        assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
5590    }
5591
5592    #[test]
5593    fn interpolate_template_supports_dollar_prefixed_paths() {
5594        let context = json!({
5595            "input": {
5596                "email_text": "hello"
5597            }
5598        });
5599
5600        let rendered = interpolate_template("value={{ $.input.email_text }}", &context);
5601        assert_eq!(rendered, "value=hello");
5602    }
5603}