Skip to main content

simple_agents_workflow/
yaml_runner.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::time::Instant;
4
5use async_trait::async_trait;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use simple_agent_type::message::{Message, Role};
10use simple_agent_type::request::CompletionRequest;
11use simple_agent_type::response::FinishReason;
12use simple_agent_type::tool::{
13    ToolCall, ToolChoice, ToolChoiceMode, ToolChoiceTool, ToolDefinition, ToolType,
14};
15use simple_agents_core::{
16    CompletionMode, CompletionOptions, CompletionOutcome, SimpleAgentsClient,
17};
18use simple_agents_healing::JsonishParser;
19use thiserror::Error;
20
21use crate::ir::{Node, NodeKind, RouterRoute, WorkflowDefinition, WORKFLOW_IR_V0};
22use crate::observability::tracing::{
23    flush_workflow_tracer, workflow_tracer, SpanKind, TraceContext, WorkflowSpan,
24};
25use crate::runtime::{
26    LlmExecutionError, LlmExecutionInput, LlmExecutionOutput, LlmExecutor, ToolExecutionError,
27    ToolExecutionInput, ToolExecutor, WorkflowRuntime, WorkflowRuntimeError,
28    WorkflowRuntimeOptions,
29};
30use crate::validation::validate_and_normalize;
31use crate::visualize::workflow_to_mermaid;
32
33mod runner;
34pub use runner::WorkflowRunner;
35mod api;
36mod client_executor;
37mod context;
38mod events;
39mod execute;
40mod globals;
41mod llm_tools;
42mod node_execution;
43mod spans;
44mod typed_contracts;
45mod validation;
46pub use api::{
47    run_email_workflow_yaml, run_email_workflow_yaml_file,
48    run_email_workflow_yaml_file_with_client,
49    run_email_workflow_yaml_file_with_client_and_custom_worker,
50    run_email_workflow_yaml_file_with_client_and_custom_worker_and_events,
51    run_email_workflow_yaml_with_client, run_email_workflow_yaml_with_client_and_custom_worker,
52    run_email_workflow_yaml_with_client_and_custom_worker_and_events,
53    run_email_workflow_yaml_with_custom_worker,
54    run_email_workflow_yaml_with_custom_worker_and_events, run_workflow_yaml,
55    run_workflow_yaml_file, run_workflow_yaml_file_with_client,
56    run_workflow_yaml_file_with_client_and_custom_worker,
57    run_workflow_yaml_file_with_client_and_custom_worker_and_events,
58    run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options,
59    run_workflow_yaml_with_client, run_workflow_yaml_with_client_and_custom_worker,
60    run_workflow_yaml_with_client_and_custom_worker_and_events,
61    run_workflow_yaml_with_custom_worker, run_workflow_yaml_with_custom_worker_and_events,
62};
63use client_executor::BorrowedClientExecutor;
64use context::{
65    build_yaml_context_from_ir_scope, collect_template_bindings, evaluate_switch_condition,
66    interpolate_template, json_type_name, parse_messages_from_context, resolve_path,
67};
68use globals::{apply_set_globals, apply_update_globals};
69use llm_tools::{
70    default_llm_output_schema, llm_output_schema_for_node, normalize_llm_tools,
71    normalize_tool_choice,
72};
73pub use typed_contracts::{
74    YamlWorkflowEventType, YamlWorkflowNodeKind, YamlWorkflowNodeOutputRecord,
75    YamlWorkflowRunTypedOutput, YamlWorkflowTypedEvent,
76};
77pub use validation::verify_yaml_workflow;
78
79const YAML_START_NODE_ID: &str = "__yaml_start";
80const YAML_LLM_TOOL_ID: &str = "__yaml_llm_call";
81const MAX_WORKFLOW_YAML_BYTES: u64 = 1024 * 1024;
82const MAX_WORKFLOW_YAML_DEPTH: usize = 64;
83
84static TRACE_ID_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
85
86#[derive(Debug, Clone, PartialEq, Serialize)]
87pub struct YamlStepTiming {
88    pub node_id: String,
89    pub node_kind: String,
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub model_name: Option<String>,
92    pub elapsed_ms: u128,
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub prompt_tokens: Option<u32>,
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub completion_tokens: Option<u32>,
97    #[serde(skip_serializing_if = "Option::is_none")]
98    pub total_tokens: Option<u32>,
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub reasoning_tokens: Option<u32>,
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub tokens_per_second: Option<f64>,
103}
104
105#[derive(Debug, Clone, PartialEq, Serialize)]
106pub struct YamlLlmNodeMetrics {
107    pub elapsed_ms: u128,
108    pub prompt_tokens: u32,
109    pub completion_tokens: u32,
110    pub total_tokens: u32,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub reasoning_tokens: Option<u32>,
113    pub tokens_per_second: f64,
114}
115
116#[derive(Debug, Clone, PartialEq, Serialize)]
117pub struct YamlWorkflowRunOutput {
118    pub workflow_id: String,
119    pub entry_node: String,
120    pub email_text: String,
121    pub trace: Vec<String>,
122    pub outputs: BTreeMap<String, Value>,
123    pub terminal_node: String,
124    pub terminal_output: Option<Value>,
125    pub step_timings: Vec<YamlStepTiming>,
126    pub llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics>,
127    pub llm_node_models: BTreeMap<String, String>,
128    pub total_elapsed_ms: u128,
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub ttft_ms: Option<u128>,
131    pub total_input_tokens: u64,
132    pub total_output_tokens: u64,
133    pub total_tokens: u64,
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub total_reasoning_tokens: Option<u64>,
136    pub tokens_per_second: f64,
137    #[serde(skip_serializing_if = "Option::is_none")]
138    pub trace_id: Option<String>,
139    #[serde(skip_serializing_if = "Option::is_none")]
140    pub metadata: Option<Value>,
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
144#[serde(rename_all = "snake_case")]
145pub enum YamlWorkflowPayloadMode {
146    #[default]
147    FullPayload,
148    RedactedPayload,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
152#[serde(rename_all = "snake_case")]
153pub enum YamlToolTraceMode {
154    #[default]
155    Full,
156    Redacted,
157    Off,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
161pub struct YamlWorkflowTraceContextInput {
162    #[serde(default)]
163    pub trace_id: Option<String>,
164    #[serde(default)]
165    pub span_id: Option<String>,
166    #[serde(default)]
167    pub parent_span_id: Option<String>,
168    #[serde(default)]
169    pub traceparent: Option<String>,
170    #[serde(default)]
171    pub tracestate: Option<String>,
172    #[serde(default)]
173    pub baggage: BTreeMap<String, String>,
174}
175
176#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
177pub struct YamlWorkflowTraceTenantContext {
178    #[serde(default)]
179    pub workspace_id: Option<String>,
180    #[serde(default)]
181    pub user_id: Option<String>,
182    #[serde(default)]
183    pub conversation_id: Option<String>,
184    #[serde(default)]
185    pub request_id: Option<String>,
186    #[serde(default)]
187    pub run_id: Option<String>,
188}
189
190#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
191pub struct YamlWorkflowTelemetryConfig {
192    #[serde(default = "default_true")]
193    pub enabled: bool,
194    #[serde(default = "default_true")]
195    pub nerdstats: bool,
196    #[serde(default = "default_sample_rate")]
197    pub sample_rate: f32,
198    #[serde(default)]
199    pub payload_mode: YamlWorkflowPayloadMode,
200    #[serde(default = "default_retention_days")]
201    pub retention_days: u32,
202    #[serde(default = "default_true")]
203    pub multi_tenant: bool,
204    #[serde(default)]
205    pub tool_trace_mode: YamlToolTraceMode,
206}
207
208impl Default for YamlWorkflowTelemetryConfig {
209    fn default() -> Self {
210        Self {
211            enabled: true,
212            nerdstats: true,
213            sample_rate: 1.0,
214            payload_mode: YamlWorkflowPayloadMode::FullPayload,
215            retention_days: 30,
216            multi_tenant: true,
217            tool_trace_mode: YamlToolTraceMode::Full,
218        }
219    }
220}
221
222#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
223pub struct YamlWorkflowTraceOptions {
224    #[serde(default)]
225    pub context: Option<YamlWorkflowTraceContextInput>,
226    #[serde(default)]
227    pub tenant: YamlWorkflowTraceTenantContext,
228}
229
230#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
231pub struct YamlWorkflowRunOptions {
232    #[serde(default)]
233    pub telemetry: YamlWorkflowTelemetryConfig,
234    #[serde(default)]
235    pub trace: YamlWorkflowTraceOptions,
236    #[serde(default)]
237    pub model: Option<String>,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
241pub struct YamlLlmTokenUsage {
242    pub prompt_tokens: u32,
243    pub completion_tokens: u32,
244    pub total_tokens: u32,
245    pub reasoning_tokens: Option<u32>,
246}
247
248#[derive(Debug, Clone, PartialEq, Serialize)]
249pub struct YamlLlmExecutionResult {
250    pub payload: Value,
251    pub usage: Option<YamlLlmTokenUsage>,
252    pub ttft_ms: Option<u128>,
253    pub tool_calls: Vec<YamlToolCallTrace>,
254}
255
256#[derive(Debug, Clone, PartialEq, Serialize)]
257pub struct YamlToolCallTrace {
258    pub id: String,
259    pub name: String,
260    pub arguments: Value,
261    pub output: Option<Value>,
262    pub status: String,
263    pub elapsed_ms: u128,
264    pub error: Option<String>,
265}
266
267#[derive(Debug, Clone, Default)]
268struct YamlTokenTotals {
269    input_tokens: u64,
270    output_tokens: u64,
271    total_tokens: u64,
272    reasoning_tokens: Option<u64>,
273}
274
275impl YamlTokenTotals {
276    fn add_usage(&mut self, usage: &YamlLlmTokenUsage) {
277        self.input_tokens += u64::from(usage.prompt_tokens);
278        self.output_tokens += u64::from(usage.completion_tokens);
279        self.total_tokens += u64::from(usage.total_tokens);
280
281        if let Some(reasoning_tokens) = usage.reasoning_tokens {
282            let next = self.reasoning_tokens.unwrap_or(0) + u64::from(reasoning_tokens);
283            self.reasoning_tokens = Some(next);
284        }
285    }
286
287    fn tokens_per_second(&self, elapsed_ms: u128) -> f64 {
288        if elapsed_ms == 0 {
289            return 0.0;
290        }
291        round_two_decimals((self.output_tokens as f64) * 1000.0 / (elapsed_ms as f64))
292    }
293}
294
295fn round_two_decimals(value: f64) -> f64 {
296    (value * 100.0).round() / 100.0
297}
298
299fn completion_tokens_per_second(completion_tokens: u32, elapsed_ms: u128) -> f64 {
300    if elapsed_ms == 0 {
301        return 0.0;
302    }
303    round_two_decimals((completion_tokens as f64) * 1000.0 / (elapsed_ms as f64))
304}
305
306fn resolve_requested_model(run_model_override: Option<&str>, node_model: &str) -> String {
307    run_model_override
308        .and_then(|model| {
309            let trimmed = model.trim();
310            if trimmed.is_empty() {
311                None
312            } else {
313                Some(trimmed.to_string())
314            }
315        })
316        .unwrap_or_else(|| node_model.to_string())
317}
318
319fn default_true() -> bool {
320    true
321}
322
323fn default_sample_rate() -> f32 {
324    1.0
325}
326
327fn default_retention_days() -> u32 {
328    30
329}
330
331fn validate_sample_rate(sample_rate: f32) -> Result<(), YamlWorkflowRunError> {
332    if sample_rate.is_finite() && (0.0..=1.0).contains(&sample_rate) {
333        return Ok(());
334    }
335
336    Err(YamlWorkflowRunError::InvalidInput {
337        message: format!(
338            "telemetry.sample_rate must be between 0.0 and 1.0 inclusive; received {sample_rate}"
339        ),
340    })
341}
342
343fn should_sample_trace(trace_id: &str, sample_rate: f32) -> bool {
344    if sample_rate >= 1.0 {
345        return true;
346    }
347    if sample_rate <= 0.0 {
348        return false;
349    }
350
351    let mut hash: u64 = 0xcbf29ce484222325;
352    for byte in trace_id.as_bytes() {
353        hash ^= u64::from(*byte);
354        hash = hash.wrapping_mul(0x100000001b3);
355    }
356
357    let ratio = (hash as f64) / (u64::MAX as f64);
358    ratio < (sample_rate as f64)
359}
360
361fn trace_id_from_traceparent(traceparent: &str) -> Option<String> {
362    let mut parts = traceparent.split('-');
363    let version = parts.next()?;
364    let trace_id = parts.next()?;
365    let _span_id = parts.next()?;
366    let _flags = parts.next()?;
367    if parts.next().is_some() {
368        return None;
369    }
370
371    if version.len() != 2 || trace_id.len() != 32 {
372        return None;
373    }
374
375    if !version.chars().all(|ch| ch.is_ascii_hexdigit()) {
376        return None;
377    }
378    if !trace_id.chars().all(|ch| ch.is_ascii_hexdigit()) {
379        return None;
380    }
381    if trace_id.chars().all(|ch| ch == '0') {
382        return None;
383    }
384
385    Some(trace_id.to_ascii_lowercase())
386}
387
388#[derive(Debug, Clone, Copy, PartialEq, Eq)]
389enum TraceIdSource {
390    Disabled,
391    ExplicitTraceId,
392    ParentTraceId,
393    Traceparent,
394    ParentTraceparent,
395    Generated,
396}
397
398#[derive(Debug, Clone)]
399struct ResolvedTelemetryContext {
400    trace_id: Option<String>,
401    sampled: bool,
402    trace_id_source: TraceIdSource,
403}
404
405fn resolve_run_trace_id_with_source(
406    options: &YamlWorkflowRunOptions,
407    parent_trace_context: Option<&TraceContext>,
408) -> (Option<String>, TraceIdSource) {
409    if !options.telemetry.enabled {
410        return (None, TraceIdSource::Disabled);
411    }
412
413    if let Some(trace_id) = options
414        .trace
415        .context
416        .as_ref()
417        .and_then(|context| context.trace_id.clone())
418    {
419        return (Some(trace_id), TraceIdSource::ExplicitTraceId);
420    }
421
422    if let Some(trace_id) = parent_trace_context.and_then(|context| context.trace_id.clone()) {
423        return (Some(trace_id), TraceIdSource::ParentTraceId);
424    }
425
426    if let Some(trace_id) = options
427        .trace
428        .context
429        .as_ref()
430        .and_then(|context| context.traceparent.as_deref())
431        .and_then(trace_id_from_traceparent)
432    {
433        return (Some(trace_id), TraceIdSource::Traceparent);
434    }
435
436    if let Some(trace_id) = parent_trace_context
437        .and_then(|context| context.traceparent.as_deref())
438        .and_then(trace_id_from_traceparent)
439    {
440        return (Some(trace_id), TraceIdSource::ParentTraceparent);
441    }
442
443    (Some(generate_trace_id()), TraceIdSource::Generated)
444}
445
446fn resolve_telemetry_context(
447    options: &YamlWorkflowRunOptions,
448    parent_trace_context: Option<&TraceContext>,
449) -> ResolvedTelemetryContext {
450    let (trace_id, trace_id_source) =
451        resolve_run_trace_id_with_source(options, parent_trace_context);
452    let sampled = trace_id
453        .as_deref()
454        .map(|value| should_sample_trace(value, options.telemetry.sample_rate))
455        .unwrap_or(false);
456
457    ResolvedTelemetryContext {
458        trace_id,
459        sampled,
460        trace_id_source,
461    }
462}
463
464fn generate_trace_id() -> String {
465    use std::time::{SystemTime, UNIX_EPOCH};
466
467    let now_nanos = SystemTime::now()
468        .duration_since(UNIX_EPOCH)
469        .map(|duration| duration.as_nanos())
470        .unwrap_or(0);
471    let sequence = u128::from(TRACE_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed));
472    format!("{:032x}", now_nanos ^ sequence)
473}
474
475fn workflow_metadata_with_trace(
476    options: &YamlWorkflowRunOptions,
477    trace_id: &str,
478    sampled: bool,
479    trace_id_source: TraceIdSource,
480) -> Value {
481    json!({
482        "telemetry": {
483            "trace_id": trace_id,
484            "trace_id_source": match trace_id_source {
485                TraceIdSource::Disabled => "disabled",
486                TraceIdSource::ExplicitTraceId => "explicit_trace_id",
487                TraceIdSource::ParentTraceId => "parent_trace_id",
488                TraceIdSource::Traceparent => "traceparent",
489                TraceIdSource::ParentTraceparent => "parent_traceparent",
490                TraceIdSource::Generated => "generated",
491            },
492            "enabled": options.telemetry.enabled,
493            "sampled": sampled,
494            "nerdstats": options.telemetry.nerdstats,
495            "sample_rate": options.telemetry.sample_rate,
496            "payload_mode": match options.telemetry.payload_mode {
497                YamlWorkflowPayloadMode::FullPayload => "full_payload",
498                YamlWorkflowPayloadMode::RedactedPayload => "redacted_payload",
499            },
500            "retention_days": options.telemetry.retention_days,
501            "multi_tenant": options.telemetry.multi_tenant,
502            "tool_trace_mode": match options.telemetry.tool_trace_mode {
503                YamlToolTraceMode::Full => "full",
504                YamlToolTraceMode::Redacted => "redacted",
505                YamlToolTraceMode::Off => "off",
506            },
507        },
508        "trace": {
509            "tenant": {
510                "workspace_id": options.trace.tenant.workspace_id,
511                "user_id": options.trace.tenant.user_id,
512                "conversation_id": options.trace.tenant.conversation_id,
513                "request_id": options.trace.tenant.request_id,
514                "run_id": options.trace.tenant.run_id,
515            }
516        },
517    })
518}
519
520fn apply_trace_identity_attributes(span: &mut dyn WorkflowSpan, trace_id: Option<&str>) {
521    if let Some(value) = trace_id {
522        span.set_attribute("trace_id", value);
523    }
524}
525
526fn apply_trace_tenant_attributes_from_tenant(
527    span: &mut dyn WorkflowSpan,
528    tenant: &YamlWorkflowTraceTenantContext,
529) {
530    if let Some(workspace_id) = tenant.workspace_id.as_deref() {
531        span.set_attribute("tenant.workspace_id", workspace_id);
532    }
533    if let Some(user_id) = tenant.user_id.as_deref() {
534        span.set_attribute("tenant.user_id", user_id);
535        span.set_attribute("user.id", user_id);
536        span.set_attribute("langfuse.user.id", user_id);
537    }
538    if let Some(conversation_id) = tenant.conversation_id.as_deref() {
539        span.set_attribute("tenant.conversation_id", conversation_id);
540        span.set_attribute("session.id", conversation_id);
541        span.set_attribute("langfuse.session.id", conversation_id);
542    }
543    if let Some(request_id) = tenant.request_id.as_deref() {
544        span.set_attribute("tenant.request_id", request_id);
545    }
546    if let Some(run_id) = tenant.run_id.as_deref() {
547        span.set_attribute("tenant.run_id", run_id);
548    }
549}
550
551fn apply_trace_tenant_attributes(span: &mut dyn WorkflowSpan, options: &YamlWorkflowRunOptions) {
552    apply_trace_tenant_attributes_from_tenant(span, &options.trace.tenant);
553}
554
555fn workflow_nerdstats(output: &YamlWorkflowRunOutput) -> Value {
556    let llm_nodes_without_usage: Vec<String> = output
557        .step_timings
558        .iter()
559        .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
560        .map(|step| step.node_id.clone())
561        .collect();
562    let token_metrics_available = llm_nodes_without_usage.is_empty();
563    let token_metrics_source = if token_metrics_available {
564        "provider_usage"
565    } else {
566        "provider_stream_usage_unavailable"
567    };
568    let total_input_tokens = if token_metrics_available {
569        json!(output.total_input_tokens)
570    } else {
571        Value::Null
572    };
573    let total_output_tokens = if token_metrics_available {
574        json!(output.total_output_tokens)
575    } else {
576        Value::Null
577    };
578    let total_tokens = if token_metrics_available {
579        json!(output.total_tokens)
580    } else {
581        Value::Null
582    };
583    let total_reasoning_tokens = if token_metrics_available {
584        json!(output.total_reasoning_tokens)
585    } else {
586        Value::Null
587    };
588    let tokens_per_second = if token_metrics_available {
589        json!(output.tokens_per_second)
590    } else {
591        Value::Null
592    };
593
594    json!({
595        "workflow_id": output.workflow_id,
596        "terminal_node": output.terminal_node,
597        "total_elapsed_ms": output.total_elapsed_ms,
598        "ttft_ms": output.ttft_ms,
599        "step_details": output.step_timings,
600        "total_input_tokens": total_input_tokens,
601        "total_output_tokens": total_output_tokens,
602        "total_tokens": total_tokens,
603        "total_reasoning_tokens": total_reasoning_tokens,
604        "tokens_per_second": tokens_per_second,
605        "trace_id": output.trace_id,
606        "token_metrics_available": token_metrics_available,
607        "token_metrics_source": token_metrics_source,
608        "llm_nodes_without_usage": llm_nodes_without_usage,
609    })
610}
611
612fn apply_langfuse_nerdstats_attributes(
613    span: &mut dyn WorkflowSpan,
614    output: &YamlWorkflowRunOutput,
615    enabled: bool,
616) {
617    if !enabled {
618        return;
619    }
620
621    let nerdstats = workflow_nerdstats(output);
622    let nerdstats_json = nerdstats.to_string();
623    span.set_attribute("langfuse.trace.metadata.nerdstats", nerdstats_json.as_str());
624
625    span.set_attribute(
626        "langfuse.trace.metadata.nerdstats.workflow_id",
627        output.workflow_id.as_str(),
628    );
629    span.set_attribute(
630        "langfuse.trace.metadata.nerdstats.terminal_node",
631        output.terminal_node.as_str(),
632    );
633    span.set_attribute(
634        "langfuse.trace.metadata.nerdstats.total_elapsed_ms",
635        output.total_elapsed_ms.to_string().as_str(),
636    );
637    span.set_attribute(
638        "langfuse.trace.metadata.nerdstats.step_details_count",
639        output.step_timings.len().to_string().as_str(),
640    );
641    span.set_attribute(
642        "langfuse.trace.metadata.nerdstats.total_input_tokens",
643        output.total_input_tokens.to_string().as_str(),
644    );
645    span.set_attribute(
646        "langfuse.trace.metadata.nerdstats.total_output_tokens",
647        output.total_output_tokens.to_string().as_str(),
648    );
649    span.set_attribute(
650        "langfuse.trace.metadata.nerdstats.total_tokens",
651        output.total_tokens.to_string().as_str(),
652    );
653    span.set_attribute(
654        "langfuse.trace.metadata.nerdstats.tokens_per_second",
655        output.tokens_per_second.to_string().as_str(),
656    );
657
658    if let Some(ttft_ms) = output.ttft_ms {
659        span.set_attribute(
660            "langfuse.trace.metadata.nerdstats.ttft_ms",
661            ttft_ms.to_string().as_str(),
662        );
663    }
664
665    if let Some(reasoning_tokens) = output.total_reasoning_tokens {
666        span.set_attribute(
667            "langfuse.trace.metadata.nerdstats.total_reasoning_tokens",
668            reasoning_tokens.to_string().as_str(),
669        );
670    }
671
672    let llm_nodes_without_usage_count = output
673        .step_timings
674        .iter()
675        .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
676        .count();
677    let token_metrics_available = llm_nodes_without_usage_count == 0;
678    span.set_attribute(
679        "langfuse.trace.metadata.nerdstats.token_metrics_available",
680        if token_metrics_available {
681            "true"
682        } else {
683            "false"
684        },
685    );
686    span.set_attribute(
687        "langfuse.trace.metadata.nerdstats.token_metrics_source",
688        if token_metrics_available {
689            "provider_usage"
690        } else {
691            "provider_stream_usage_unavailable"
692        },
693    );
694    span.set_attribute(
695        "langfuse.trace.metadata.nerdstats.llm_nodes_without_usage_count",
696        llm_nodes_without_usage_count.to_string().as_str(),
697    );
698}
699
700fn apply_langfuse_trace_input_output_attributes(
701    span: &mut dyn WorkflowSpan,
702    workflow_input: &Value,
703    output: &YamlWorkflowRunOutput,
704    payload_mode: YamlWorkflowPayloadMode,
705) {
706    let trace_input = payload_for_span(payload_mode, workflow_input);
707    span.set_attribute("langfuse.trace.input", trace_input.as_str());
708
709    if let Some(terminal_output) = output.terminal_output.as_ref() {
710        let trace_output = payload_for_span(payload_mode, terminal_output);
711        span.set_attribute("langfuse.trace.output", trace_output.as_str());
712    }
713
714    let usage_details = json!({
715        "input": output.total_input_tokens,
716        "output": output.total_output_tokens,
717        "total": output.total_tokens,
718        "reasoning": output.total_reasoning_tokens,
719    })
720    .to_string();
721    span.set_attribute(
722        "langfuse.trace.metadata.usage_details",
723        usage_details.as_str(),
724    );
725}
726
727fn apply_langfuse_observation_usage_attributes(
728    span: &mut dyn WorkflowSpan,
729    usage: &YamlLlmTokenUsage,
730) {
731    let usage_details = json!({
732        "input": usage.prompt_tokens,
733        "output": usage.completion_tokens,
734        "total": usage.total_tokens,
735        "reasoning": usage.reasoning_tokens,
736    })
737    .to_string();
738    span.set_attribute("langfuse.observation.usage_details", usage_details.as_str());
739    span.set_attribute(
740        "gen_ai.usage.input_tokens",
741        usage.prompt_tokens.to_string().as_str(),
742    );
743    span.set_attribute(
744        "gen_ai.usage.output_tokens",
745        usage.completion_tokens.to_string().as_str(),
746    );
747    span.set_attribute(
748        "gen_ai.usage.total_tokens",
749        usage.total_tokens.to_string().as_str(),
750    );
751    if let Some(reasoning_tokens) = usage.reasoning_tokens {
752        span.set_attribute(
753            "gen_ai.usage.reasoning_tokens",
754            reasoning_tokens.to_string().as_str(),
755        );
756    }
757}
758
759fn payload_for_span(mode: YamlWorkflowPayloadMode, payload: &Value) -> String {
760    match mode {
761        YamlWorkflowPayloadMode::FullPayload => payload.to_string(),
762        YamlWorkflowPayloadMode::RedactedPayload => json!({
763            "redacted": true,
764            "value_type": match payload {
765                Value::Null => "null",
766                Value::Bool(_) => "bool",
767                Value::Number(_) => "number",
768                Value::String(_) => "string",
769                Value::Array(_) => "array",
770                Value::Object(_) => "object",
771            }
772        })
773        .to_string(),
774    }
775}
776
777fn payload_for_tool_trace(mode: YamlToolTraceMode, payload: &Value) -> Value {
778    match mode {
779        YamlToolTraceMode::Full => payload.clone(),
780        YamlToolTraceMode::Redacted => json!({
781            "redacted": true,
782            "value_type": json_type_name(payload),
783        }),
784        YamlToolTraceMode::Off => Value::Null,
785    }
786}
787
788fn validate_json_schema(schema: &Value) -> Result<(), String> {
789    jsonschema::JSONSchema::compile(schema)
790        .map(|_| ())
791        .map_err(|error| format!("invalid JSON schema: {error}"))
792}
793
794fn validate_schema_instance(schema: &Value, instance: &Value) -> Result<(), String> {
795    let validator = jsonschema::JSONSchema::compile(schema)
796        .map_err(|error| format!("invalid JSON schema: {error}"))?;
797    if let Err(errors) = validator.validate(instance) {
798        let message = errors
799            .into_iter()
800            .next()
801            .map(|error| error.to_string())
802            .unwrap_or_else(|| "unknown schema validation error".to_string());
803        return Err(format!("schema validation failed: {message}"));
804    }
805    Ok(())
806}
807
808fn schema_type(schema: &Value) -> Option<&str> {
809    schema.get("type").and_then(Value::as_str)
810}
811
812fn schema_expects_object(schema: &Value) -> bool {
813    schema_type(schema) == Some("object")
814}
815
816fn trace_context_from_options(options: &YamlWorkflowRunOptions) -> Option<TraceContext> {
817    options.trace.context.as_ref().map(|input| TraceContext {
818        trace_id: input.trace_id.clone(),
819        span_id: input.span_id.clone(),
820        parent_span_id: input.parent_span_id.clone(),
821        traceparent: input.traceparent.clone(),
822        tracestate: input.tracestate.clone(),
823        baggage: input.baggage.clone(),
824    })
825}
826
827fn merged_trace_context_for_worker(
828    span_context: Option<&TraceContext>,
829    resolved_trace_id: Option<&str>,
830    options: &YamlWorkflowRunOptions,
831) -> TraceContext {
832    let input_context = options.trace.context.as_ref();
833    let baggage = if let Some(context) = span_context {
834        if !context.baggage.is_empty() {
835            context.baggage.clone()
836        } else {
837            input_context
838                .map(|value| value.baggage.clone())
839                .unwrap_or_default()
840        }
841    } else {
842        input_context
843            .map(|value| value.baggage.clone())
844            .unwrap_or_default()
845    };
846
847    TraceContext {
848        trace_id: span_context
849            .and_then(|context| context.trace_id.clone())
850            .or_else(|| resolved_trace_id.map(|value| value.to_string()))
851            .or_else(|| input_context.and_then(|context| context.trace_id.clone())),
852        span_id: span_context
853            .and_then(|context| context.span_id.clone())
854            .or_else(|| input_context.and_then(|context| context.span_id.clone())),
855        parent_span_id: span_context
856            .and_then(|context| context.parent_span_id.clone())
857            .or_else(|| input_context.and_then(|context| context.parent_span_id.clone())),
858        traceparent: span_context
859            .and_then(|context| context.traceparent.clone())
860            .or_else(|| input_context.and_then(|context| context.traceparent.clone())),
861        tracestate: span_context
862            .and_then(|context| context.tracestate.clone())
863            .or_else(|| input_context.and_then(|context| context.tracestate.clone())),
864        baggage,
865    }
866}
867
868fn custom_worker_context_with_trace(
869    context: &Value,
870    trace_context: &TraceContext,
871    tenant_context: &YamlWorkflowTraceTenantContext,
872) -> Value {
873    let mut context_with_trace = context.clone();
874    let Some(root) = context_with_trace.as_object_mut() else {
875        return context_with_trace;
876    };
877
878    root.insert(
879        "trace".to_string(),
880        json!({
881            "context": {
882                "trace_id": trace_context.trace_id,
883                "span_id": trace_context.span_id,
884                "parent_span_id": trace_context.parent_span_id,
885                "traceparent": trace_context.traceparent,
886                "tracestate": trace_context.tracestate,
887                "baggage": trace_context.baggage,
888            },
889            "tenant": {
890                "workspace_id": tenant_context.workspace_id,
891                "user_id": tenant_context.user_id,
892                "conversation_id": tenant_context.conversation_id,
893                "request_id": tenant_context.request_id,
894                "run_id": tenant_context.run_id,
895            }
896        }),
897    );
898
899    context_with_trace
900}
901
902fn include_raw_stream_debug_events() -> bool {
903    match std::env::var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW") {
904        Ok(value) => {
905            let normalized = value.trim().to_ascii_lowercase();
906            normalized == "1" || normalized == "true" || normalized == "yes" || normalized == "on"
907        }
908        Err(_) => false,
909    }
910}
911
912#[derive(Debug)]
913struct StreamedPayloadResolution {
914    payload: Value,
915    heal_confidence: Option<f32>,
916}
917
918#[derive(Debug, Default)]
919struct StreamJsonAsTextFormatter {
920    raw_json: String,
921    emitted: bool,
922}
923
924impl StreamJsonAsTextFormatter {
925    fn push(&mut self, chunk: &str) {
926        self.raw_json.push_str(chunk);
927    }
928
929    fn emit_if_ready(&mut self, complete: bool) -> Option<String> {
930        if self.emitted || !complete {
931            return None;
932        }
933        self.emitted = true;
934        Some(render_json_object_as_text(self.raw_json.as_str()))
935    }
936}
937
938fn render_json_object_as_text(raw_json: &str) -> String {
939    let value = match serde_json::from_str::<Value>(raw_json) {
940        Ok(value) => value,
941        Err(_) => return raw_json.to_string(),
942    };
943    let Some(object) = value.as_object() else {
944        return raw_json.to_string();
945    };
946
947    let mut lines = Vec::with_capacity(object.len());
948    for (key, value) in object {
949        let rendered = match value {
950            Value::String(text) => text.clone(),
951            _ => value.to_string(),
952        };
953        lines.push(format!("{key}: {rendered}"));
954    }
955    lines.join("\n")
956}
957
958#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
959#[serde(rename_all = "snake_case")]
960pub enum YamlWorkflowTokenKind {
961    Output,
962    Thinking,
963}
964
965#[derive(Debug, Default)]
966struct StructuredJsonDeltaFilter {
967    started: bool,
968    completed: bool,
969    depth: u32,
970    in_string: bool,
971    escape: bool,
972}
973
974impl StructuredJsonDeltaFilter {
975    fn split(&mut self, delta: &str) -> (Option<String>, Option<String>) {
976        if delta.is_empty() {
977            return (None, None);
978        }
979
980        let mut output = String::new();
981        let mut thinking = String::new();
982
983        for ch in delta.chars() {
984            if self.completed {
985                thinking.push(ch);
986                continue;
987            }
988
989            if !self.started {
990                if ch != '{' {
991                    thinking.push(ch);
992                    continue;
993                }
994                self.started = true;
995                self.depth = 1;
996                output.push(ch);
997                continue;
998            }
999
1000            output.push(ch);
1001            if self.in_string {
1002                if self.escape {
1003                    self.escape = false;
1004                    continue;
1005                }
1006                if ch == '\\' {
1007                    self.escape = true;
1008                    continue;
1009                }
1010                if ch == '"' {
1011                    self.in_string = false;
1012                }
1013                continue;
1014            }
1015
1016            match ch {
1017                '"' => self.in_string = true,
1018                '{' => self.depth = self.depth.saturating_add(1),
1019                '}' => {
1020                    if self.depth > 0 {
1021                        self.depth -= 1;
1022                    }
1023                    if self.depth == 0 {
1024                        self.completed = true;
1025                    }
1026                }
1027                _ => {}
1028            }
1029        }
1030
1031        let output = if output.is_empty() {
1032            None
1033        } else {
1034            Some(output)
1035        };
1036        let thinking = if thinking.is_empty() {
1037            None
1038        } else {
1039            Some(thinking)
1040        };
1041
1042        (output, thinking)
1043    }
1044
1045    fn completed(&self) -> bool {
1046        self.completed
1047    }
1048}
1049
1050fn extract_last_fenced_json_block(raw: &str) -> Option<&str> {
1051    let start = raw.rfind("```json")?;
1052    let remainder = &raw[start + "```json".len()..];
1053    let end = remainder.find("```")?;
1054    let candidate = remainder[..end].trim();
1055    if candidate.is_empty() {
1056        return None;
1057    }
1058    Some(candidate)
1059}
1060
1061fn extract_balanced_object_from(raw: &str, start_index: usize) -> Option<&str> {
1062    let mut depth = 0u32;
1063    let mut in_string = false;
1064    let mut escape = false;
1065
1066    for (relative_index, ch) in raw[start_index..].char_indices() {
1067        if in_string {
1068            if escape {
1069                escape = false;
1070                continue;
1071            }
1072            if ch == '\\' {
1073                escape = true;
1074                continue;
1075            }
1076            if ch == '"' {
1077                in_string = false;
1078            }
1079            continue;
1080        }
1081
1082        match ch {
1083            '"' => in_string = true,
1084            '{' => depth = depth.saturating_add(1),
1085            '}' => {
1086                if depth == 0 {
1087                    return None;
1088                }
1089                depth -= 1;
1090                if depth == 0 {
1091                    let end_index = start_index + relative_index + ch.len_utf8();
1092                    return Some(raw[start_index..end_index].trim());
1093                }
1094            }
1095            _ => {}
1096        }
1097    }
1098
1099    None
1100}
1101
1102fn extract_last_parsable_object(raw: &str) -> Option<&str> {
1103    let starts: Vec<usize> = raw
1104        .char_indices()
1105        .filter_map(|(index, ch)| if ch == '{' { Some(index) } else { None })
1106        .collect();
1107
1108    for start in starts.into_iter().rev() {
1109        let Some(candidate) = extract_balanced_object_from(raw, start) else {
1110            continue;
1111        };
1112        if serde_json::from_str::<Value>(candidate).is_ok() {
1113            return Some(candidate);
1114        }
1115    }
1116
1117    None
1118}
1119
1120fn resolve_structured_json_candidate(raw: &str) -> Option<&str> {
1121    extract_last_fenced_json_block(raw).or_else(|| extract_last_parsable_object(raw))
1122}
1123
1124fn parse_streamed_structured_payload(
1125    raw: &str,
1126    heal: bool,
1127) -> Result<StreamedPayloadResolution, String> {
1128    if !heal {
1129        if let Ok(payload) = serde_json::from_str::<Value>(raw) {
1130            return Ok(StreamedPayloadResolution {
1131                payload,
1132                heal_confidence: None,
1133            });
1134        }
1135
1136        let candidate = resolve_structured_json_candidate(raw).ok_or_else(|| {
1137            "failed to parse streamed structured completion JSON: no JSON object candidate found"
1138                .to_string()
1139        })?;
1140        let payload = serde_json::from_str::<Value>(candidate).map_err(|error| {
1141            format!(
1142                "failed to parse streamed structured completion JSON: {error}; candidate={candidate}"
1143            )
1144        })?;
1145        return Ok(StreamedPayloadResolution {
1146            payload,
1147            heal_confidence: None,
1148        });
1149    }
1150
1151    let candidate = resolve_structured_json_candidate(raw).unwrap_or(raw);
1152    let parser = JsonishParser::new();
1153    let healed = parser
1154        .parse(candidate)
1155        .map_err(|error| format!("failed to heal streamed structured completion JSON: {error}"))?;
1156
1157    Ok(StreamedPayloadResolution {
1158        payload: healed.value,
1159        heal_confidence: Some(healed.confidence),
1160    })
1161}
1162
1163#[derive(Debug, Clone, PartialEq, Serialize)]
1164pub struct YamlWorkflowEvent {
1165    pub event_type: String,
1166    pub node_id: Option<String>,
1167    pub step_id: Option<String>,
1168    pub node_kind: Option<String>,
1169    pub streamable: Option<bool>,
1170    pub message: Option<String>,
1171    pub delta: Option<String>,
1172    pub token_kind: Option<YamlWorkflowTokenKind>,
1173    pub is_terminal_node_token: Option<bool>,
1174    pub elapsed_ms: Option<u128>,
1175    pub metadata: Option<Value>,
1176}
1177
1178pub type WorkflowMessageRole = Role;
1179
1180#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1181pub struct WorkflowMessage {
1182    pub role: WorkflowMessageRole,
1183    pub content: String,
1184    #[serde(default)]
1185    pub name: Option<String>,
1186    #[serde(default, alias = "toolCallId")]
1187    pub tool_call_id: Option<String>,
1188}
1189
1190#[derive(Debug, Clone, PartialEq, Serialize)]
1191pub struct YamlTemplateBinding {
1192    pub index: usize,
1193    pub expression: String,
1194    pub source_path: String,
1195    pub resolved: Value,
1196    pub resolved_type: String,
1197    pub missing: bool,
1198}
1199
1200#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1201pub enum YamlWorkflowDiagnosticSeverity {
1202    Error,
1203    Warning,
1204}
1205
1206#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1207pub struct YamlWorkflowDiagnostic {
1208    pub node_id: Option<String>,
1209    pub code: String,
1210    pub severity: YamlWorkflowDiagnosticSeverity,
1211    pub message: String,
1212}
1213
1214#[derive(Debug, Error)]
1215pub enum YamlWorkflowRunError {
1216    #[error("failed to read workflow yaml '{path}': {source}")]
1217    Read {
1218        path: String,
1219        source: std::io::Error,
1220    },
1221    #[error("failed to parse workflow yaml '{path}': {source}")]
1222    Parse {
1223        path: String,
1224        source: serde_yaml::Error,
1225    },
1226    #[error("rejected workflow yaml '{path}': {reason}")]
1227    FileRejected { path: String, reason: String },
1228    #[error("workflow '{workflow_id}' has no nodes")]
1229    EmptyNodes { workflow_id: String },
1230    #[error("entry node '{entry_node}' does not exist")]
1231    MissingEntry { entry_node: String },
1232    #[error("unknown node id '{node_id}'")]
1233    MissingNode { node_id: String },
1234    #[error("unsupported node type in '{node_id}'")]
1235    UnsupportedNodeType { node_id: String },
1236    #[error("unsupported switch condition format: {condition}")]
1237    UnsupportedCondition { condition: String },
1238    #[error("switch node '{node_id}' has no valid next target")]
1239    InvalidSwitchTarget { node_id: String },
1240    #[error("llm returned non-object payload for node '{node_id}'")]
1241    LlmPayloadNotObject { node_id: String },
1242    #[error("custom worker handler '{handler}' is not supported")]
1243    UnsupportedCustomHandler { handler: String },
1244    #[error("llm execution failed for node '{node_id}': {message}")]
1245    Llm { node_id: String, message: String },
1246    #[error("custom worker execution failed for node '{node_id}': {message}")]
1247    CustomWorker { node_id: String, message: String },
1248    #[error("workflow validation failed with {diagnostics_count} error(s)")]
1249    Validation {
1250        diagnostics_count: usize,
1251        diagnostics: Vec<YamlWorkflowDiagnostic>,
1252    },
1253    #[error("invalid workflow input: {message}")]
1254    InvalidInput { message: String },
1255    #[error("ir runtime execution failed: {message}")]
1256    IrRuntime { message: String },
1257    #[error("workflow event stream cancelled: {message}")]
1258    EventSinkCancelled { message: String },
1259}
1260
1261pub trait YamlWorkflowEventSink: Send + Sync {
1262    fn emit(&self, event: &YamlWorkflowEvent);
1263
1264    fn is_cancelled(&self) -> bool {
1265        false
1266    }
1267}
1268
1269pub struct NoopYamlWorkflowEventSink;
1270
1271impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
1272    fn emit(&self, _event: &YamlWorkflowEvent) {}
1273}
1274
1275fn workflow_event_sink_cancelled_message() -> &'static str {
1276    "workflow event callback cancelled"
1277}
1278
1279fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
1280    event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
1281}
1282
1283#[derive(Debug, Clone, PartialEq, Eq, Error)]
1284pub enum YamlToIrError {
1285    #[error("entry node '{entry_node}' does not exist")]
1286    MissingEntry { entry_node: String },
1287    #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
1288    MultipleOutgoingEdge { node_id: String },
1289    #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
1290    UnsupportedNode { node_id: String, reason: String },
1291}
1292
1293/// Render a YAML workflow graph as Mermaid flowchart.
1294pub fn yaml_workflow_to_mermaid(workflow: &YamlWorkflow) -> String {
1295    if let Ok(ir) = yaml_workflow_to_ir(workflow) {
1296        return workflow_to_mermaid(&ir);
1297    }
1298
1299    yaml_workflow_to_mermaid_fallback(workflow)
1300}
1301
1302fn yaml_workflow_to_mermaid_fallback(workflow: &YamlWorkflow) -> String {
1303    let mut lines = Vec::new();
1304    lines.push("flowchart TD".to_string());
1305    let mut tool_node_ids: Vec<String> = Vec::new();
1306
1307    for node in &workflow.nodes {
1308        let label = format!("{}\\n({})", node.id, node.kind_name());
1309        lines.push(format!(
1310            "  {}[\"{}\"]",
1311            sanitize_mermaid_id(&node.id),
1312            escape_mermaid_label(label.as_str()),
1313        ));
1314
1315        if let Some(llm) = node.node_type.llm_call.as_ref() {
1316            for (idx, tool_name) in llm_tool_names(llm).into_iter().enumerate() {
1317                let tool_id = sanitize_mermaid_id(format!("{}__tool_{}", node.id, idx).as_str());
1318                lines.push(format!(
1319                    "  {}([\"{}\"])",
1320                    tool_id,
1321                    escape_mermaid_label(format!("tool: {tool_name}").as_str())
1322                ));
1323                lines.push(format!(
1324                    "  {} -.-> {}",
1325                    sanitize_mermaid_id(&node.id),
1326                    tool_id
1327                ));
1328                tool_node_ids.push(tool_id);
1329            }
1330        }
1331    }
1332
1333    let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1334
1335    for edge in &workflow.edges {
1336        emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1337    }
1338
1339    for node in &workflow.nodes {
1340        if let Some(switch) = node.node_type.switch.as_ref() {
1341            for branch in &switch.branches {
1342                emitted.insert((
1343                    node.id.clone(),
1344                    branch.condition.clone(),
1345                    branch.target.clone(),
1346                ));
1347            }
1348            emitted.insert((
1349                node.id.clone(),
1350                "default".to_string(),
1351                switch.default.clone(),
1352            ));
1353        }
1354    }
1355
1356    let mut edges = emitted.into_iter().collect::<Vec<_>>();
1357    edges.sort();
1358
1359    for (from, label, to) in edges {
1360        if label.is_empty() {
1361            lines.push(format!(
1362                "  {} --> {}",
1363                sanitize_mermaid_id(&from),
1364                sanitize_mermaid_id(&to)
1365            ));
1366        } else {
1367            lines.push(format!(
1368                "  {} -- \"{}\" --> {}",
1369                sanitize_mermaid_id(&from),
1370                escape_mermaid_label(&label),
1371                sanitize_mermaid_id(&to)
1372            ));
1373        }
1374    }
1375
1376    if !tool_node_ids.is_empty() {
1377        lines.push("  classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1378        lines.push(format!("  class {} toolNode;", tool_node_ids.join(",")));
1379    }
1380
1381    lines.join("\n")
1382}
1383
1384fn llm_tool_names(llm: &YamlLlmCall) -> Vec<String> {
1385    llm.tools
1386        .iter()
1387        .map(|tool| match tool {
1388            YamlToolDeclaration::OpenAi(openai) => openai.function.name.clone(),
1389            YamlToolDeclaration::Simplified(simple) => simple.name.clone(),
1390        })
1391        .collect()
1392}
1393
1394/// Load a YAML workflow file and render it as Mermaid flowchart.
1395pub fn yaml_workflow_file_to_mermaid(workflow_path: &Path) -> Result<String, YamlWorkflowRunError> {
1396    let (canonical_path, workflow) = load_workflow_yaml_file(workflow_path)?;
1397
1398    let referenced_subgraphs = discover_referenced_subgraphs(canonical_path.as_path(), &workflow)?;
1399    if referenced_subgraphs.is_empty() {
1400        return Ok(yaml_workflow_to_mermaid(&workflow));
1401    }
1402
1403    Ok(yaml_workflow_to_mermaid_with_subgraphs(
1404        &workflow,
1405        &referenced_subgraphs,
1406    ))
1407}
1408
1409#[derive(Debug, Clone)]
1410struct MermaidSubgraphWorkflow {
1411    alias: String,
1412    workflow: YamlWorkflow,
1413}
1414
1415#[derive(Debug, Default)]
1416struct MermaidBlockRender {
1417    lines: Vec<String>,
1418    tool_node_ids: Vec<String>,
1419    run_workflow_tool_node_ids: Vec<String>,
1420    entry_node_id: String,
1421}
1422
1423fn yaml_workflow_to_mermaid_with_subgraphs(
1424    workflow: &YamlWorkflow,
1425    subgraphs: &[MermaidSubgraphWorkflow],
1426) -> String {
1427    let mut lines = vec!["flowchart TD".to_string()];
1428
1429    let main_block = render_mermaid_block(workflow, "main");
1430    lines.push(format!(
1431        "  subgraph main_graph[\"Main: {}\"]",
1432        escape_mermaid_label(&workflow.id)
1433    ));
1434    for line in &main_block.lines {
1435        lines.push(format!("    {line}"));
1436    }
1437    lines.push("  end".to_string());
1438
1439    let mut all_tool_nodes = main_block.tool_node_ids.clone();
1440
1441    for (index, subgraph) in subgraphs.iter().enumerate() {
1442        let block_id = format!("subgraph_{}", index + 1);
1443        let block = render_mermaid_block(&subgraph.workflow, &block_id);
1444        lines.push(format!(
1445            "  subgraph {}[\"Subgraph: {}\"]",
1446            sanitize_mermaid_id(&format!("{}_cluster", block_id)),
1447            escape_mermaid_label(&subgraph.alias)
1448        ));
1449        for line in &block.lines {
1450            lines.push(format!("    {line}"));
1451        }
1452        lines.push("  end".to_string());
1453
1454        for tool_node in &main_block.run_workflow_tool_node_ids {
1455            lines.push(format!(
1456                "  {} -. \"{}\" .-> {}",
1457                tool_node,
1458                escape_mermaid_label(&format!("calls {}", subgraph.alias)),
1459                block.entry_node_id
1460            ));
1461        }
1462
1463        all_tool_nodes.extend(block.tool_node_ids);
1464    }
1465
1466    if !all_tool_nodes.is_empty() {
1467        lines.push("  classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1468        lines.push(format!("  class {} toolNode;", all_tool_nodes.join(",")));
1469    }
1470
1471    lines.join("\n")
1472}
1473
1474fn render_mermaid_block(workflow: &YamlWorkflow, prefix: &str) -> MermaidBlockRender {
1475    let mut block = MermaidBlockRender {
1476        entry_node_id: prefixed_mermaid_id(prefix, &workflow.entry_node),
1477        ..Default::default()
1478    };
1479
1480    for node in &workflow.nodes {
1481        let node_id = prefixed_mermaid_id(prefix, &node.id);
1482        let label = format!("{}\\n({})", node.id, node.kind_name());
1483        block
1484            .lines
1485            .push(format!("{}[\"{}\"]", node_id, escape_mermaid_label(&label)));
1486
1487        if let Some(llm) = node.node_type.llm_call.as_ref() {
1488            for (idx, tool) in llm.tools.iter().enumerate() {
1489                let tool_name = tool_declaration_name(tool);
1490                let tool_id = prefixed_mermaid_id(prefix, &format!("{}__tool_{}", node.id, idx));
1491                block.lines.push(format!(
1492                    "{}([\"{}\"])",
1493                    tool_id,
1494                    escape_mermaid_label(format!("tool: {tool_name}").as_str())
1495                ));
1496                block.lines.push(format!("{} -.-> {}", node_id, tool_id));
1497                block.tool_node_ids.push(tool_id.clone());
1498
1499                if tool_name == "run_workflow_graph" {
1500                    block.run_workflow_tool_node_ids.push(tool_id);
1501                }
1502            }
1503        }
1504    }
1505
1506    let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1507
1508    for edge in &workflow.edges {
1509        emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1510    }
1511
1512    for node in &workflow.nodes {
1513        if let Some(switch) = node.node_type.switch.as_ref() {
1514            for branch in &switch.branches {
1515                emitted.insert((
1516                    node.id.clone(),
1517                    branch.condition.clone(),
1518                    branch.target.clone(),
1519                ));
1520            }
1521            emitted.insert((
1522                node.id.clone(),
1523                "default".to_string(),
1524                switch.default.clone(),
1525            ));
1526        }
1527    }
1528
1529    let mut edges = emitted.into_iter().collect::<Vec<_>>();
1530    edges.sort();
1531
1532    for (from, label, to) in edges {
1533        let from_id = prefixed_mermaid_id(prefix, &from);
1534        let to_id = prefixed_mermaid_id(prefix, &to);
1535        if label.is_empty() {
1536            block.lines.push(format!("{} --> {}", from_id, to_id));
1537        } else {
1538            block.lines.push(format!(
1539                "{} -- \"{}\" --> {}",
1540                from_id,
1541                escape_mermaid_label(&label),
1542                to_id
1543            ));
1544        }
1545    }
1546
1547    block
1548}
1549
1550fn discover_referenced_subgraphs(
1551    workflow_path: &Path,
1552    workflow: &YamlWorkflow,
1553) -> Result<Vec<MermaidSubgraphWorkflow>, YamlWorkflowRunError> {
1554    let workflow_ids = referenced_workflow_ids(workflow);
1555    if workflow_ids.is_empty() {
1556        return Ok(Vec::new());
1557    }
1558
1559    let parent_dir = workflow_path.parent().unwrap_or(Path::new("."));
1560    let sibling_workflows = load_yaml_sibling_workflows(parent_dir, workflow_path)?;
1561
1562    let mut discovered = Vec::new();
1563    let mut seen = HashSet::new();
1564
1565    for workflow_id in workflow_ids {
1566        if seen.contains(&workflow_id) {
1567            continue;
1568        }
1569
1570        if let Some((_, subworkflow)) = sibling_workflows
1571            .iter()
1572            .find(|(key, _)| key == &workflow_id)
1573        {
1574            discovered.push(MermaidSubgraphWorkflow {
1575                alias: workflow_id.clone(),
1576                workflow: subworkflow.clone(),
1577            });
1578            seen.insert(workflow_id);
1579        }
1580    }
1581
1582    Ok(discovered)
1583}
1584
1585fn load_yaml_sibling_workflows(
1586    parent_dir: &Path,
1587    workflow_path: &Path,
1588) -> Result<Vec<(String, YamlWorkflow)>, YamlWorkflowRunError> {
1589    let mut results = Vec::new();
1590    let canonical_target =
1591        std::fs::canonicalize(workflow_path).unwrap_or_else(|_| workflow_path.to_path_buf());
1592    let entries = std::fs::read_dir(parent_dir).map_err(|source| YamlWorkflowRunError::Read {
1593        path: parent_dir.display().to_string(),
1594        source,
1595    })?;
1596
1597    for entry in entries {
1598        let entry = entry.map_err(|source| YamlWorkflowRunError::Read {
1599            path: parent_dir.display().to_string(),
1600            source,
1601        })?;
1602        let path = entry.path();
1603        if !is_yaml_file(&path) {
1604            continue;
1605        }
1606
1607        let canonical_path = std::fs::canonicalize(&path).unwrap_or(path.clone());
1608        if canonical_path == canonical_target {
1609            continue;
1610        }
1611
1612        let (_, subworkflow) = load_workflow_yaml_file(path.as_path())?;
1613
1614        if let Some(stem) = path.file_stem().and_then(|value| value.to_str()) {
1615            results.push((stem.to_string(), subworkflow.clone()));
1616        }
1617        results.push((subworkflow.id.clone(), subworkflow));
1618    }
1619
1620    Ok(results)
1621}
1622
1623fn referenced_workflow_ids(workflow: &YamlWorkflow) -> Vec<String> {
1624    let mut ids = Vec::new();
1625    let mut seen = HashSet::new();
1626
1627    for node in &workflow.nodes {
1628        let prompt = node
1629            .config
1630            .as_ref()
1631            .and_then(|config| config.prompt.as_deref());
1632
1633        if let Some(llm) = node.node_type.llm_call.as_ref() {
1634            for tool in &llm.tools {
1635                if tool_declaration_name(tool) != "run_workflow_graph" {
1636                    continue;
1637                }
1638
1639                for workflow_id in referenced_workflow_ids_from_tool(tool) {
1640                    if seen.insert(workflow_id.clone()) {
1641                        ids.push(workflow_id);
1642                    }
1643                }
1644
1645                if let Some(prompt_text) = prompt {
1646                    for workflow_id in referenced_workflow_ids_from_prompt(prompt_text) {
1647                        if seen.insert(workflow_id.clone()) {
1648                            ids.push(workflow_id);
1649                        }
1650                    }
1651                }
1652            }
1653        }
1654    }
1655
1656    ids
1657}
1658
1659fn referenced_workflow_ids_from_tool(tool: &YamlToolDeclaration) -> Vec<String> {
1660    let schema = match tool {
1661        YamlToolDeclaration::OpenAi(openai) => openai.function.parameters.as_ref(),
1662        YamlToolDeclaration::Simplified(simple) => Some(&simple.input_schema),
1663    };
1664
1665    let mut ids = Vec::new();
1666    if let Some(schema) = schema {
1667        if let Some(workflow_prop) = schema
1668            .get("properties")
1669            .and_then(Value::as_object)
1670            .and_then(|properties| properties.get("workflow_id"))
1671        {
1672            if let Some(value) = workflow_prop.get("const").and_then(Value::as_str) {
1673                ids.push(value.to_string());
1674            }
1675
1676            if let Some(enum_values) = workflow_prop.get("enum").and_then(Value::as_array) {
1677                for value in enum_values {
1678                    if let Some(value) = value.as_str() {
1679                        ids.push(value.to_string());
1680                    }
1681                }
1682            }
1683        }
1684    }
1685
1686    ids
1687}
1688
1689fn referenced_workflow_ids_from_prompt(prompt: &str) -> Vec<String> {
1690    let mut ids = Vec::new();
1691    let mut search = prompt;
1692
1693    while let Some(index) = search.find("\"workflow_id\"") {
1694        let remainder = &search[index + "\"workflow_id\"".len()..];
1695        let Some(colon_index) = remainder.find(':') else {
1696            break;
1697        };
1698
1699        let candidate = remainder[colon_index + 1..].trim_start();
1700        if let Some(rest) = candidate.strip_prefix('"') {
1701            if let Some(end_quote_index) = rest.find('"') {
1702                let workflow_id = rest[..end_quote_index].trim();
1703                if !workflow_id.is_empty() {
1704                    ids.push(workflow_id.to_string());
1705                }
1706                search = &rest[end_quote_index + 1..];
1707                continue;
1708            }
1709        }
1710
1711        search = &remainder[colon_index + 1..];
1712    }
1713
1714    ids
1715}
1716
1717fn is_yaml_file(path: &Path) -> bool {
1718    matches!(
1719        path.extension().and_then(|ext| ext.to_str()),
1720        Some("yaml") | Some("yml")
1721    )
1722}
1723
1724fn yaml_value_depth(value: &serde_yaml::Value, depth: usize) -> usize {
1725    match value {
1726        serde_yaml::Value::Sequence(items) => items
1727            .iter()
1728            .map(|item| yaml_value_depth(item, depth + 1))
1729            .max()
1730            .unwrap_or(depth),
1731        serde_yaml::Value::Mapping(map) => map
1732            .values()
1733            .map(|item| yaml_value_depth(item, depth + 1))
1734            .max()
1735            .unwrap_or(depth),
1736        _ => depth,
1737    }
1738}
1739
1740fn load_workflow_yaml_file(
1741    workflow_path: &Path,
1742) -> Result<(PathBuf, YamlWorkflow), YamlWorkflowRunError> {
1743    let canonical_path =
1744        std::fs::canonicalize(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1745            path: workflow_path.display().to_string(),
1746            source,
1747        })?;
1748
1749    let metadata =
1750        std::fs::metadata(&canonical_path).map_err(|source| YamlWorkflowRunError::Read {
1751            path: canonical_path.display().to_string(),
1752            source,
1753        })?;
1754
1755    if !metadata.is_file() {
1756        return Err(YamlWorkflowRunError::FileRejected {
1757            path: canonical_path.display().to_string(),
1758            reason: "path must reference a regular file".to_string(),
1759        });
1760    }
1761
1762    if !is_yaml_file(&canonical_path) {
1763        return Err(YamlWorkflowRunError::FileRejected {
1764            path: canonical_path.display().to_string(),
1765            reason: "workflow file extension must be .yaml or .yml".to_string(),
1766        });
1767    }
1768
1769    if metadata.len() > MAX_WORKFLOW_YAML_BYTES {
1770        return Err(YamlWorkflowRunError::FileRejected {
1771            path: canonical_path.display().to_string(),
1772            reason: format!(
1773                "workflow yaml is too large ({} bytes > {} bytes)",
1774                metadata.len(),
1775                MAX_WORKFLOW_YAML_BYTES
1776            ),
1777        });
1778    }
1779
1780    let contents =
1781        std::fs::read_to_string(&canonical_path).map_err(|source| YamlWorkflowRunError::Read {
1782            path: canonical_path.display().to_string(),
1783            source,
1784        })?;
1785
1786    let yaml_value: serde_yaml::Value =
1787        serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1788            path: canonical_path.display().to_string(),
1789            source,
1790        })?;
1791
1792    let depth = yaml_value_depth(&yaml_value, 1);
1793    if depth > MAX_WORKFLOW_YAML_DEPTH {
1794        return Err(YamlWorkflowRunError::FileRejected {
1795            path: canonical_path.display().to_string(),
1796            reason: format!(
1797                "workflow yaml nesting depth {} exceeds limit {}",
1798                depth, MAX_WORKFLOW_YAML_DEPTH
1799            ),
1800        });
1801    }
1802
1803    let workflow: YamlWorkflow =
1804        serde_yaml::from_value(yaml_value).map_err(|source| YamlWorkflowRunError::Parse {
1805            path: canonical_path.display().to_string(),
1806            source,
1807        })?;
1808
1809    Ok((canonical_path, workflow))
1810}
1811
1812fn prefixed_mermaid_id(prefix: &str, id: &str) -> String {
1813    sanitize_mermaid_id(format!("{}__{}", prefix, id).as_str())
1814}
1815
1816fn tool_declaration_name(tool: &YamlToolDeclaration) -> &str {
1817    match tool {
1818        YamlToolDeclaration::OpenAi(openai) => &openai.function.name,
1819        YamlToolDeclaration::Simplified(simple) => &simple.name,
1820    }
1821}
1822
1823pub fn yaml_workflow_to_ir(workflow: &YamlWorkflow) -> Result<WorkflowDefinition, YamlToIrError> {
1824    let known_ids: HashSet<&str> = workflow.nodes.iter().map(|n| n.id.as_str()).collect();
1825    if !known_ids.contains(workflow.entry_node.as_str()) {
1826        return Err(YamlToIrError::MissingEntry {
1827            entry_node: workflow.entry_node.clone(),
1828        });
1829    }
1830
1831    let mut outgoing: HashMap<&str, Vec<&str>> = HashMap::new();
1832    for edge in &workflow.edges {
1833        outgoing
1834            .entry(edge.from.as_str())
1835            .or_default()
1836            .push(edge.to.as_str());
1837    }
1838
1839    let mut nodes = Vec::with_capacity(workflow.nodes.len() + 1);
1840    nodes.push(Node {
1841        id: YAML_START_NODE_ID.to_string(),
1842        kind: NodeKind::Start {
1843            next: workflow.entry_node.clone(),
1844        },
1845    });
1846
1847    for node in &workflow.nodes {
1848        if let Some(llm) = node.node_type.llm_call.as_ref() {
1849            if node
1850                .config
1851                .as_ref()
1852                .and_then(|c| c.set_globals.as_ref())
1853                .is_some()
1854                || node
1855                    .config
1856                    .as_ref()
1857                    .and_then(|c| c.update_globals.as_ref())
1858                    .is_some()
1859            {
1860                return Err(YamlToIrError::UnsupportedNode {
1861                    node_id: node.id.clone(),
1862                    reason: "set_globals/update_globals are not represented in canonical IR llm nodes yet"
1863                        .to_string(),
1864                });
1865            }
1866
1867            if !llm.tools.is_empty() {
1868                return Err(YamlToIrError::UnsupportedNode {
1869                    node_id: node.id.clone(),
1870                    reason: "llm_call.tools are not represented in canonical IR llm nodes yet"
1871                        .to_string(),
1872                });
1873            }
1874
1875            let next = single_next_for_node(&outgoing, &node.id)?;
1876            nodes.push(Node {
1877                id: node.id.clone(),
1878                kind: NodeKind::Tool {
1879                    tool: YAML_LLM_TOOL_ID.to_string(),
1880                    input: json!({
1881                        "node_id": node.id,
1882                        "model": llm.model,
1883                        "prompt_template": node
1884                            .config
1885                            .as_ref()
1886                            .and_then(|c| c.prompt.clone())
1887                            .unwrap_or_default(),
1888                        "stream": llm.stream.unwrap_or(false),
1889                        "stream_json_as_text": llm.stream_json_as_text.unwrap_or(false),
1890                        "heal": llm.heal.unwrap_or(false),
1891                        "max_tokens": llm.max_tokens,
1892                        "temperature": llm.temperature,
1893                        "top_p": llm.top_p,
1894                        "messages_path": llm.messages_path,
1895                        "append_prompt_as_user": llm.append_prompt_as_user.unwrap_or(true),
1896                        "output_schema": node
1897                            .config
1898                            .as_ref()
1899                            .and_then(|c| c.output_schema.clone())
1900                            .unwrap_or_else(default_llm_output_schema),
1901                    }),
1902                    next,
1903                },
1904            });
1905            continue;
1906        }
1907
1908        if let Some(worker) = node.node_type.custom_worker.as_ref() {
1909            if node
1910                .config
1911                .as_ref()
1912                .and_then(|c| c.set_globals.as_ref())
1913                .is_some()
1914                || node
1915                    .config
1916                    .as_ref()
1917                    .and_then(|c| c.update_globals.as_ref())
1918                    .is_some()
1919            {
1920                return Err(YamlToIrError::UnsupportedNode {
1921                    node_id: node.id.clone(),
1922                    reason: "set_globals/update_globals are not represented in canonical IR tool nodes yet"
1923                        .to_string(),
1924                });
1925            }
1926
1927            let next = single_next_for_node(&outgoing, &node.id)?;
1928            nodes.push(Node {
1929                id: node.id.clone(),
1930                kind: NodeKind::Tool {
1931                    tool: worker.handler.clone(),
1932                    input: {
1933                        let mut payload = node
1934                            .config
1935                            .as_ref()
1936                            .and_then(|c| c.payload.clone())
1937                            .unwrap_or_else(|| json!({}));
1938                        if let Some(payload_obj) = payload.as_object_mut() {
1939                            payload_obj.insert(
1940                                "__handler_file".to_string(),
1941                                worker
1942                                    .handler_file
1943                                    .as_ref()
1944                                    .map(|value| Value::String(value.clone()))
1945                                    .unwrap_or(Value::Null),
1946                            );
1947                        }
1948                        payload
1949                    },
1950                    next,
1951                },
1952            });
1953            continue;
1954        }
1955
1956        if let Some(switch) = node.node_type.switch.as_ref() {
1957            let mut routes = Vec::with_capacity(switch.branches.len());
1958            for branch in &switch.branches {
1959                let rewritten =
1960                    rewrite_yaml_condition_to_ir(&branch.condition).map_err(|reason| {
1961                        YamlToIrError::UnsupportedNode {
1962                            node_id: node.id.clone(),
1963                            reason: format!(
1964                                "failed to rewrite switch condition '{}': {}",
1965                                branch.condition, reason
1966                            ),
1967                        }
1968                    })?;
1969                routes.push(RouterRoute {
1970                    when: rewritten,
1971                    next: branch.target.clone(),
1972                });
1973            }
1974            nodes.push(Node {
1975                id: node.id.clone(),
1976                kind: NodeKind::Router {
1977                    routes,
1978                    default: switch.default.clone(),
1979                },
1980            });
1981            continue;
1982        }
1983
1984        return Err(YamlToIrError::UnsupportedNode {
1985            node_id: node.id.clone(),
1986            reason: "node_type must be llm_call, switch, or custom_worker".to_string(),
1987        });
1988    }
1989
1990    Ok(WorkflowDefinition {
1991        version: WORKFLOW_IR_V0.to_string(),
1992        name: workflow.id.clone(),
1993        nodes,
1994    })
1995}
1996
1997fn single_next_for_node(
1998    outgoing: &HashMap<&str, Vec<&str>>,
1999    node_id: &str,
2000) -> Result<Option<String>, YamlToIrError> {
2001    match outgoing.get(node_id) {
2002        None => Ok(None),
2003        Some(targets) if targets.len() == 1 => Ok(Some(targets[0].to_string())),
2004        Some(_) => Err(YamlToIrError::MultipleOutgoingEdge {
2005            node_id: node_id.to_string(),
2006        }),
2007    }
2008}
2009
2010fn rewrite_yaml_condition_to_ir(expr: &str) -> Result<String, String> {
2011    let chars: Vec<char> = expr.chars().collect();
2012    let mut out = String::with_capacity(expr.len());
2013    let mut index = 0usize;
2014    let mut quote_context: Option<char> = None;
2015    let mut escaped = false;
2016
2017    while index < chars.len() {
2018        let current = chars[index];
2019
2020        if let Some(quote) = quote_context {
2021            out.push(current);
2022            if escaped {
2023                escaped = false;
2024            } else if current == '\\' {
2025                escaped = true;
2026            } else if current == quote {
2027                quote_context = None;
2028            }
2029            index += 1;
2030            continue;
2031        }
2032
2033        if current == '"' || current == '\'' {
2034            quote_context = Some(current);
2035            out.push(current);
2036            index += 1;
2037            continue;
2038        }
2039
2040        if starts_with_chars(&chars, index, "$.nodes.") {
2041            out.push_str("$.node_outputs.");
2042            index += "$.nodes.".chars().count();
2043            continue;
2044        }
2045
2046        if starts_with_chars(&chars, index, ".output")
2047            && is_output_segment_boundary(chars.get(index + ".output".chars().count()).copied())
2048        {
2049            index += ".output".chars().count();
2050            continue;
2051        }
2052
2053        out.push(current);
2054        index += 1;
2055    }
2056
2057    if quote_context.is_some() {
2058        return Err("condition contains an unterminated quoted string".to_string());
2059    }
2060
2061    Ok(out)
2062}
2063
2064fn starts_with_chars(chars: &[char], start: usize, pattern: &str) -> bool {
2065    let pattern_chars: Vec<char> = pattern.chars().collect();
2066    if start + pattern_chars.len() > chars.len() {
2067        return false;
2068    }
2069    chars[start..start + pattern_chars.len()] == pattern_chars
2070}
2071
2072fn is_output_segment_boundary(next: Option<char>) -> bool {
2073    match next {
2074        None => true,
2075        Some(ch) => {
2076            ch.is_whitespace()
2077                || matches!(
2078                    ch,
2079                    '.' | ')' | ']' | '}' | ',' | '=' | '!' | '<' | '>' | '&' | '|'
2080                )
2081        }
2082    }
2083}
2084
2085fn sanitize_mermaid_id(id: &str) -> String {
2086    let mut out = String::with_capacity(id.len() + 1);
2087    if id
2088        .chars()
2089        .next()
2090        .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_')
2091    {
2092        out.push_str(id);
2093    } else {
2094        out.push('n');
2095        out.push('_');
2096        out.push_str(id);
2097    }
2098    out.chars()
2099        .map(|ch| {
2100            if ch.is_ascii_alphanumeric() || ch == '_' {
2101                ch
2102            } else {
2103                '_'
2104            }
2105        })
2106        .collect()
2107}
2108
2109fn escape_mermaid_label(label: &str) -> String {
2110    label.replace('"', "\\\"")
2111}
2112
2113#[derive(Debug, Clone)]
2114pub struct YamlLlmExecutionRequest {
2115    pub node_id: String,
2116    pub is_terminal_node: bool,
2117    pub stream_json_as_text: bool,
2118    pub model: String,
2119    pub max_tokens: Option<u32>,
2120    pub temperature: Option<f32>,
2121    pub top_p: Option<f32>,
2122    pub messages: Option<Vec<Message>>,
2123    pub append_prompt_as_user: bool,
2124    pub prompt: String,
2125    pub prompt_template: String,
2126    pub prompt_bindings: Vec<YamlTemplateBinding>,
2127    pub schema: Value,
2128    pub stream: bool,
2129    pub heal: bool,
2130    pub tools: Vec<YamlResolvedTool>,
2131    pub tool_choice: Option<ToolChoice>,
2132    pub max_tool_roundtrips: u8,
2133    pub tool_calls_global_key: Option<String>,
2134    pub tool_trace_mode: YamlToolTraceMode,
2135    pub execution_context: Value,
2136    pub email_text: String,
2137    pub trace_id: Option<String>,
2138    pub trace_context: Option<TraceContext>,
2139    pub tenant_context: YamlWorkflowTraceTenantContext,
2140    pub trace_sampled: bool,
2141}
2142
2143#[derive(Debug, Clone)]
2144pub struct YamlResolvedTool {
2145    pub definition: ToolDefinition,
2146    pub output_schema: Option<Value>,
2147}
2148
2149#[async_trait]
2150pub trait YamlWorkflowLlmExecutor: Send + Sync {
2151    async fn complete_structured(
2152        &self,
2153        request: YamlLlmExecutionRequest,
2154        event_sink: Option<&dyn YamlWorkflowEventSink>,
2155    ) -> Result<YamlLlmExecutionResult, String>;
2156}
2157
2158#[async_trait]
2159pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
2160    async fn execute(
2161        &self,
2162        handler: &str,
2163        handler_file: Option<&str>,
2164        payload: &Value,
2165        email_text: &str,
2166        context: &Value,
2167    ) -> Result<Value, String>;
2168}
2169
2170pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2171    workflow: &YamlWorkflow,
2172    workflow_input: &Value,
2173    client: &SimpleAgentsClient,
2174    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2175    event_sink: Option<&dyn YamlWorkflowEventSink>,
2176    options: &YamlWorkflowRunOptions,
2177) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2178    let executor = BorrowedClientExecutor {
2179        client,
2180        custom_worker,
2181        run_options: options.clone(),
2182    };
2183    run_workflow_yaml_with_custom_worker_and_events_and_options(
2184        workflow,
2185        workflow_input,
2186        &executor,
2187        custom_worker,
2188        event_sink,
2189        options,
2190    )
2191    .await
2192}
2193
2194pub async fn run_workflow_yaml_with_custom_worker_and_events_and_options(
2195    workflow: &YamlWorkflow,
2196    workflow_input: &Value,
2197    executor: &dyn YamlWorkflowLlmExecutor,
2198    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2199    event_sink: Option<&dyn YamlWorkflowEventSink>,
2200    options: &YamlWorkflowRunOptions,
2201) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2202    execute::run_workflow_yaml_with_custom_worker_and_events_and_options_impl(
2203        workflow,
2204        workflow_input,
2205        executor,
2206        custom_worker,
2207        event_sink,
2208        options,
2209    )
2210    .await
2211}
2212
2213async fn try_run_yaml_via_ir_runtime(
2214    workflow: &YamlWorkflow,
2215    workflow_input: &Value,
2216    executor: &dyn YamlWorkflowLlmExecutor,
2217    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2218    options: &YamlWorkflowRunOptions,
2219) -> Result<Option<YamlWorkflowRunOutput>, YamlWorkflowRunError> {
2220    let ir = match yaml_workflow_to_ir(workflow) {
2221        Ok(def) => def,
2222        Err(YamlToIrError::UnsupportedNode { .. })
2223        | Err(YamlToIrError::MultipleOutgoingEdge { .. }) => return Ok(None),
2224        Err(err) => {
2225            return Err(YamlWorkflowRunError::InvalidInput {
2226                message: err.to_string(),
2227            });
2228        }
2229    };
2230
2231    if validate_and_normalize(&ir).is_err() {
2232        return Ok(None);
2233    }
2234
2235    let parent_trace_context = trace_context_from_options(options);
2236    let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
2237
2238    let tracer = workflow_tracer();
2239    let mut workflow_span_context: Option<TraceContext> = None;
2240    let mut workflow_span = if telemetry_context.sampled {
2241        let (span_context, mut span) = tracer.start_span(
2242            "workflow.run",
2243            SpanKind::Workflow,
2244            parent_trace_context.as_ref(),
2245        );
2246        apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
2247        apply_trace_tenant_attributes(span.as_mut(), options);
2248        workflow_span_context = Some(span_context);
2249        Some(span)
2250    } else {
2251        None
2252    };
2253
2254    struct NoopLlm;
2255    #[async_trait]
2256    impl LlmExecutor for NoopLlm {
2257        async fn execute(
2258            &self,
2259            _input: LlmExecutionInput,
2260        ) -> Result<LlmExecutionOutput, LlmExecutionError> {
2261            Err(LlmExecutionError::UnexpectedOutcome(
2262                "yaml_ir_uses_tool_path",
2263            ))
2264        }
2265    }
2266
2267    struct YamlIrToolExecutor<'a> {
2268        llm_executor: &'a dyn YamlWorkflowLlmExecutor,
2269        custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
2270        token_totals: std::sync::Mutex<YamlTokenTotals>,
2271        node_usage: std::sync::Mutex<BTreeMap<String, YamlLlmTokenUsage>>,
2272        node_models: std::sync::Mutex<BTreeMap<String, String>>,
2273        model_override: Option<String>,
2274        trace_id: Option<String>,
2275        trace_context: Option<TraceContext>,
2276        trace_input_context: Option<YamlWorkflowTraceContextInput>,
2277        tenant_context: YamlWorkflowTraceTenantContext,
2278        payload_mode: YamlWorkflowPayloadMode,
2279        trace_sampled: bool,
2280    }
2281
2282    #[async_trait]
2283    impl ToolExecutor for YamlIrToolExecutor<'_> {
2284        async fn execute_tool(
2285            &self,
2286            input: ToolExecutionInput,
2287        ) -> Result<Value, ToolExecutionError> {
2288            let context = build_yaml_context_from_ir_scope(&input.scoped_input);
2289
2290            if input.tool == YAML_LLM_TOOL_ID {
2291                let node_id = input
2292                    .input
2293                    .get("node_id")
2294                    .and_then(Value::as_str)
2295                    .ok_or_else(|| {
2296                        ToolExecutionError::Failed("yaml llm call missing node_id".to_string())
2297                    })?
2298                    .to_string();
2299                let node_id_for_metrics = node_id.clone();
2300                let model = input
2301                    .input
2302                    .get("model")
2303                    .and_then(Value::as_str)
2304                    .ok_or_else(|| {
2305                        ToolExecutionError::Failed("yaml llm call missing model".to_string())
2306                    })?
2307                    .to_string();
2308                let resolved_model =
2309                    resolve_requested_model(self.model_override.as_deref(), &model);
2310                let prompt_template = input
2311                    .input
2312                    .get("prompt_template")
2313                    .and_then(Value::as_str)
2314                    .unwrap_or_default()
2315                    .to_string();
2316                let stream = input
2317                    .input
2318                    .get("stream")
2319                    .and_then(Value::as_bool)
2320                    .unwrap_or(false);
2321                let max_tokens = input
2322                    .input
2323                    .get("max_tokens")
2324                    .and_then(Value::as_u64)
2325                    .and_then(|value| u32::try_from(value).ok());
2326                let temperature = input
2327                    .input
2328                    .get("temperature")
2329                    .and_then(Value::as_f64)
2330                    .map(|value| value as f32);
2331                let top_p = input
2332                    .input
2333                    .get("top_p")
2334                    .and_then(Value::as_f64)
2335                    .map(|value| value as f32);
2336                let heal = input
2337                    .input
2338                    .get("heal")
2339                    .and_then(Value::as_bool)
2340                    .unwrap_or(false);
2341                let append_prompt_as_user = input
2342                    .input
2343                    .get("append_prompt_as_user")
2344                    .and_then(Value::as_bool)
2345                    .unwrap_or(true);
2346                let messages_path = input
2347                    .input
2348                    .get("messages_path")
2349                    .and_then(Value::as_str)
2350                    .map(str::to_string);
2351
2352                let messages = if let Some(path) = messages_path.as_deref() {
2353                    Some(
2354                        parse_messages_from_context(path, &context)
2355                            .map_err(ToolExecutionError::Failed)?,
2356                    )
2357                } else {
2358                    None
2359                };
2360
2361                let prompt_bindings = collect_template_bindings(&prompt_template, &context);
2362                let prompt = interpolate_template(&prompt_template, &context);
2363                let email_text = context
2364                    .get("input")
2365                    .and_then(|v| v.get("email_text"))
2366                    .and_then(Value::as_str)
2367                    .unwrap_or_default();
2368                let schema = input
2369                    .input
2370                    .get("output_schema")
2371                    .cloned()
2372                    .unwrap_or_else(default_llm_output_schema);
2373
2374                let request = YamlLlmExecutionRequest {
2375                    node_id,
2376                    is_terminal_node: false,
2377                    stream_json_as_text: input
2378                        .input
2379                        .get("stream_json_as_text")
2380                        .and_then(Value::as_bool)
2381                        .unwrap_or(false),
2382                    model: resolved_model.clone(),
2383                    max_tokens,
2384                    temperature,
2385                    top_p,
2386                    messages,
2387                    append_prompt_as_user,
2388                    prompt,
2389                    prompt_template,
2390                    prompt_bindings,
2391                    schema,
2392                    stream,
2393                    heal,
2394                    tools: Vec::new(),
2395                    tool_choice: None,
2396                    max_tool_roundtrips: 1,
2397                    tool_calls_global_key: None,
2398                    tool_trace_mode: YamlToolTraceMode::Off,
2399                    execution_context: context.clone(),
2400                    email_text: email_text.to_string(),
2401                    trace_id: self.trace_id.clone(),
2402                    trace_context: self.trace_context.clone(),
2403                    tenant_context: self.tenant_context.clone(),
2404                    trace_sampled: self.trace_sampled,
2405                };
2406
2407                let llm_result = self
2408                    .llm_executor
2409                    .complete_structured(request, None)
2410                    .await
2411                    .map_err(ToolExecutionError::Failed);
2412
2413                if let Ok(ref result) = llm_result {
2414                    if let Some(usage) = result.usage.as_ref() {
2415                        if let Ok(mut totals) = self.token_totals.lock() {
2416                            totals.add_usage(usage);
2417                        }
2418                        if let Ok(mut usage_map) = self.node_usage.lock() {
2419                            usage_map.insert(node_id_for_metrics.clone(), usage.clone());
2420                        }
2421                    }
2422                    if let Ok(mut model_map) = self.node_models.lock() {
2423                        model_map.insert(node_id_for_metrics, resolved_model);
2424                    }
2425                }
2426
2427                return llm_result.map(|result| result.payload);
2428            }
2429
2430            let worker = self
2431                .custom_worker
2432                .ok_or_else(|| ToolExecutionError::NotFound {
2433                    tool: input.tool.clone(),
2434                })?;
2435
2436            let mut payload = input.input.clone();
2437            let handler_file = payload
2438                .as_object_mut()
2439                .and_then(|obj| obj.remove("__handler_file"))
2440                .and_then(|value| value.as_str().map(ToString::to_string));
2441            let email_text = context
2442                .get("input")
2443                .and_then(|v| v.get("email_text"))
2444                .and_then(Value::as_str)
2445                .unwrap_or_default();
2446
2447            let tracer = workflow_tracer();
2448            let mut handler_span_context: Option<TraceContext> = None;
2449            let mut handler_span = if self.trace_sampled {
2450                let (span_context, mut span) = tracer.start_span(
2451                    "handler.invoke",
2452                    SpanKind::Node,
2453                    self.trace_context.as_ref(),
2454                );
2455                handler_span_context = Some(span_context);
2456                apply_trace_identity_attributes(span.as_mut(), self.trace_id.as_deref());
2457                span.set_attribute("handler_name", input.tool.as_str());
2458                apply_trace_tenant_attributes_from_tenant(span.as_mut(), &self.tenant_context);
2459                span.set_attribute(
2460                    "node_input",
2461                    payload_for_span(self.payload_mode, &payload).as_str(),
2462                );
2463                Some(span)
2464            } else {
2465                None
2466            };
2467
2468            let trace_options = YamlWorkflowRunOptions {
2469                telemetry: YamlWorkflowTelemetryConfig::default(),
2470                trace: YamlWorkflowTraceOptions {
2471                    context: self.trace_input_context.clone(),
2472                    tenant: self.tenant_context.clone(),
2473                },
2474                model: None,
2475            };
2476            let worker_trace_context = merged_trace_context_for_worker(
2477                handler_span_context.as_ref(),
2478                self.trace_id.as_deref(),
2479                &trace_options,
2480            );
2481            let worker_context = custom_worker_context_with_trace(
2482                &context,
2483                &worker_trace_context,
2484                &self.tenant_context,
2485            );
2486
2487            let output_result = worker
2488                .execute(
2489                    &input.tool,
2490                    handler_file.as_deref(),
2491                    &payload,
2492                    email_text,
2493                    &worker_context,
2494                )
2495                .await
2496                .map_err(ToolExecutionError::Failed);
2497
2498            if let Some(span) = handler_span.as_mut() {
2499                if output_result.is_ok() {
2500                    span.add_event("handler.success");
2501                } else {
2502                    span.add_event("handler.error");
2503                }
2504            }
2505
2506            if let Some(span) = handler_span.take() {
2507                span.end();
2508            }
2509
2510            output_result
2511        }
2512    }
2513
2514    let tool_executor = YamlIrToolExecutor {
2515        llm_executor: executor,
2516        custom_worker,
2517        token_totals: std::sync::Mutex::new(YamlTokenTotals::default()),
2518        node_usage: std::sync::Mutex::new(BTreeMap::new()),
2519        node_models: std::sync::Mutex::new(BTreeMap::new()),
2520        model_override: options.model.clone(),
2521        trace_id: telemetry_context.trace_id.clone(),
2522        trace_context: workflow_span_context.clone(),
2523        trace_input_context: options.trace.context.clone(),
2524        tenant_context: options.trace.tenant.clone(),
2525        payload_mode: options.telemetry.payload_mode,
2526        trace_sampled: telemetry_context.sampled,
2527    };
2528
2529    let runtime_options = WorkflowRuntimeOptions {
2530        validate_before_run: false,
2531        ..WorkflowRuntimeOptions::default()
2532    };
2533    let runtime = WorkflowRuntime::new(ir, &NoopLlm, Some(&tool_executor), runtime_options);
2534
2535    let started = Instant::now();
2536    let result = match runtime.execute(workflow_input.clone(), None).await {
2537        Ok(result) => result,
2538        Err(WorkflowRuntimeError::Validation(_)) => return Ok(None),
2539        Err(error) => {
2540            return Err(YamlWorkflowRunError::IrRuntime {
2541                message: error.to_string(),
2542            });
2543        }
2544    };
2545    let total_elapsed_ms = started.elapsed().as_millis();
2546
2547    let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
2548    for (node_id, output) in result.node_outputs {
2549        if node_id == YAML_START_NODE_ID {
2550            continue;
2551        }
2552        outputs.insert(node_id, json!({"output": output}));
2553    }
2554
2555    let mut trace = Vec::new();
2556    let mut step_timings = Vec::new();
2557    let node_usage_map = tool_executor
2558        .node_usage
2559        .lock()
2560        .map(|usage| usage.clone())
2561        .unwrap_or_default();
2562    let llm_node_models = tool_executor
2563        .node_models
2564        .lock()
2565        .map(|models| models.clone())
2566        .unwrap_or_default();
2567    let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
2568    for execution in result.node_executions {
2569        if execution.node_id == YAML_START_NODE_ID {
2570            continue;
2571        }
2572        let node_id = execution.node_id;
2573        trace.push(node_id.clone());
2574        let usage = node_usage_map.get(&node_id);
2575        if let Some(usage) = usage {
2576            llm_node_metrics.insert(
2577                node_id.clone(),
2578                YamlLlmNodeMetrics {
2579                    elapsed_ms: 0,
2580                    prompt_tokens: usage.prompt_tokens,
2581                    completion_tokens: usage.completion_tokens,
2582                    total_tokens: usage.total_tokens,
2583                    reasoning_tokens: usage.reasoning_tokens,
2584                    tokens_per_second: completion_tokens_per_second(usage.completion_tokens, 0),
2585                },
2586            );
2587        }
2588        step_timings.push(YamlStepTiming {
2589            node_id: node_id.clone(),
2590            node_kind: "ir_runtime".to_string(),
2591            model_name: llm_node_models.get(&node_id).cloned(),
2592            elapsed_ms: 0,
2593            prompt_tokens: usage.map(|value| value.prompt_tokens),
2594            completion_tokens: usage.map(|value| value.completion_tokens),
2595            total_tokens: usage.map(|value| value.total_tokens),
2596            reasoning_tokens: usage.and_then(|value| value.reasoning_tokens),
2597            tokens_per_second: usage
2598                .map(|value| completion_tokens_per_second(value.completion_tokens, 0)),
2599        });
2600    }
2601
2602    let terminal_node = result.terminal_node_id;
2603    let terminal_output = outputs
2604        .get(&terminal_node)
2605        .and_then(|v| v.get("output"))
2606        .cloned();
2607
2608    let email_text = workflow_input
2609        .get("email_text")
2610        .and_then(Value::as_str)
2611        .unwrap_or_default()
2612        .to_string();
2613
2614    let token_totals = tool_executor
2615        .token_totals
2616        .lock()
2617        .map(|totals| totals.clone())
2618        .unwrap_or_default();
2619
2620    let output = YamlWorkflowRunOutput {
2621        workflow_id: workflow.id.clone(),
2622        entry_node: workflow.entry_node.clone(),
2623        email_text,
2624        trace,
2625        outputs,
2626        terminal_node,
2627        terminal_output,
2628        step_timings,
2629        llm_node_metrics,
2630        llm_node_models,
2631        total_elapsed_ms,
2632        ttft_ms: None,
2633        total_input_tokens: token_totals.input_tokens,
2634        total_output_tokens: token_totals.output_tokens,
2635        total_tokens: token_totals.total_tokens,
2636        total_reasoning_tokens: token_totals.reasoning_tokens,
2637        tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
2638        trace_id: telemetry_context.trace_id.clone(),
2639        metadata: telemetry_context.trace_id.as_ref().map(|value| {
2640            workflow_metadata_with_trace(
2641                options,
2642                value,
2643                telemetry_context.sampled,
2644                telemetry_context.trace_id_source,
2645            )
2646        }),
2647    };
2648
2649    if let Some(mut span) = workflow_span.take() {
2650        span.set_attribute("workflow_id", workflow.id.as_str());
2651        apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
2652        apply_langfuse_trace_input_output_attributes(
2653            span.as_mut(),
2654            workflow_input,
2655            &output,
2656            options.telemetry.payload_mode,
2657        );
2658        apply_langfuse_nerdstats_attributes(span.as_mut(), &output, options.telemetry.nerdstats);
2659        span.end();
2660        flush_workflow_tracer();
2661    }
2662
2663    Ok(Some(output))
2664}
2665
2666async fn execute_subworkflow_tool_call(
2667    payload: &Value,
2668    context: &Value,
2669    client: &SimpleAgentsClient,
2670    custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2671    parent_options: &YamlWorkflowRunOptions,
2672    parent_trace_context: Option<&TraceContext>,
2673    resolved_trace_id: Option<&str>,
2674) -> Result<Value, String> {
2675    let workflow_id = payload
2676        .get("workflow_id")
2677        .and_then(Value::as_str)
2678        .ok_or_else(|| "run_workflow_graph requires payload.workflow_id".to_string())?;
2679
2680    let input_context = context
2681        .get("input")
2682        .and_then(Value::as_object)
2683        .ok_or_else(|| "run_workflow_graph requires context.input".to_string())?;
2684
2685    let registry = input_context
2686        .get("workflow_registry")
2687        .and_then(Value::as_object)
2688        .ok_or_else(|| {
2689            "run_workflow_graph requires input.workflow_registry map of workflow_id -> yaml_path"
2690                .to_string()
2691        })?;
2692
2693    let workflow_path = registry
2694        .get(workflow_id)
2695        .and_then(Value::as_str)
2696        .ok_or_else(|| {
2697            format!(
2698                "workflow_registry has no entry for workflow_id '{}'",
2699                workflow_id
2700            )
2701        })?;
2702
2703    let parent_depth = input_context
2704        .get("__subgraph_depth")
2705        .and_then(Value::as_u64)
2706        .unwrap_or(0);
2707    let max_depth = input_context
2708        .get("__subgraph_max_depth")
2709        .and_then(Value::as_u64)
2710        .unwrap_or(3);
2711
2712    if parent_depth >= max_depth {
2713        return Err(format!(
2714            "run_workflow_graph depth limit reached (depth={}, max={})",
2715            parent_depth, max_depth
2716        ));
2717    }
2718
2719    let mut subworkflow_input = payload
2720        .get("input")
2721        .and_then(Value::as_object)
2722        .cloned()
2723        .unwrap_or_default();
2724
2725    if !subworkflow_input.contains_key("messages") {
2726        if let Some(messages) = input_context.get("messages") {
2727            subworkflow_input.insert("messages".to_string(), messages.clone());
2728        }
2729    }
2730
2731    if !subworkflow_input.contains_key("email_text") {
2732        if let Some(email_text) = input_context.get("email_text") {
2733            subworkflow_input.insert("email_text".to_string(), email_text.clone());
2734        }
2735    }
2736
2737    if !subworkflow_input.contains_key("workflow_registry") {
2738        subworkflow_input.insert(
2739            "workflow_registry".to_string(),
2740            Value::Object(registry.clone()),
2741        );
2742    }
2743
2744    subworkflow_input.insert(
2745        "__subgraph_depth".to_string(),
2746        Value::Number(serde_json::Number::from(parent_depth + 1)),
2747    );
2748    subworkflow_input.insert(
2749        "__subgraph_max_depth".to_string(),
2750        Value::Number(serde_json::Number::from(max_depth)),
2751    );
2752
2753    let subworkflow_options =
2754        build_subworkflow_options(parent_options, parent_trace_context, resolved_trace_id);
2755
2756    let output = run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
2757        Path::new(workflow_path),
2758        &Value::Object(subworkflow_input),
2759        client,
2760        custom_worker,
2761        None,
2762        &subworkflow_options,
2763    )
2764    .await
2765    .map_err(|error| format!("subworkflow '{}' failed: {}", workflow_id, error))?;
2766
2767    Ok(json!({
2768        "workflow_id": workflow_id,
2769        "workflow_path": workflow_path,
2770        "terminal_node": output.terminal_node,
2771        "terminal_output": output.terminal_output,
2772        "trace": output.trace,
2773    }))
2774}
2775
2776fn build_subworkflow_options(
2777    parent_options: &YamlWorkflowRunOptions,
2778    parent_trace_context: Option<&TraceContext>,
2779    resolved_trace_id: Option<&str>,
2780) -> YamlWorkflowRunOptions {
2781    let mut subworkflow_options = parent_options.clone();
2782    if parent_trace_context.is_some() || resolved_trace_id.is_some() {
2783        let trace_context = YamlWorkflowTraceContextInput {
2784            trace_id: resolved_trace_id
2785                .map(|value| value.to_string())
2786                .or_else(|| parent_trace_context.and_then(|ctx| ctx.trace_id.clone())),
2787            span_id: parent_trace_context.and_then(|ctx| ctx.span_id.clone()),
2788            parent_span_id: parent_trace_context.and_then(|ctx| ctx.parent_span_id.clone()),
2789            traceparent: parent_trace_context.and_then(|ctx| ctx.traceparent.clone()),
2790            tracestate: parent_trace_context.and_then(|ctx| ctx.tracestate.clone()),
2791            baggage: parent_trace_context
2792                .map(|ctx| ctx.baggage.clone())
2793                .unwrap_or_default(),
2794        };
2795        subworkflow_options.trace.context = Some(trace_context);
2796    }
2797    subworkflow_options
2798}
2799
2800#[derive(Debug, Clone, Deserialize)]
2801pub struct YamlWorkflow {
2802    pub id: String,
2803    pub entry_node: String,
2804    #[serde(default)]
2805    pub nodes: Vec<YamlNode>,
2806    #[serde(default)]
2807    pub edges: Vec<YamlEdge>,
2808}
2809
2810#[derive(Debug, Clone, Deserialize)]
2811pub struct YamlNode {
2812    pub id: String,
2813    pub node_type: YamlNodeType,
2814    pub config: Option<YamlNodeConfig>,
2815}
2816
2817impl YamlNode {
2818    fn kind_name(&self) -> &'static str {
2819        if self.node_type.llm_call.is_some() {
2820            "llm_call"
2821        } else if self.node_type.switch.is_some() {
2822            "switch"
2823        } else if self.node_type.custom_worker.is_some() {
2824            "custom_worker"
2825        } else if self.node_type.end.is_some() {
2826            "end"
2827        } else {
2828            "unknown"
2829        }
2830    }
2831}
2832
2833#[derive(Debug, Clone, Deserialize)]
2834#[serde(deny_unknown_fields)]
2835pub struct YamlNodeType {
2836    pub llm_call: Option<YamlLlmCall>,
2837    pub switch: Option<YamlSwitch>,
2838    pub custom_worker: Option<YamlCustomWorker>,
2839    pub end: Option<Value>,
2840}
2841
2842#[derive(Debug, Clone, Deserialize)]
2843#[serde(deny_unknown_fields)]
2844pub struct YamlLlmCall {
2845    pub model: String,
2846    pub max_tokens: Option<u32>,
2847    pub temperature: Option<f32>,
2848    pub top_p: Option<f32>,
2849    pub stream: Option<bool>,
2850    pub stream_json_as_text: Option<bool>,
2851    pub heal: Option<bool>,
2852    pub messages_path: Option<String>,
2853    pub append_prompt_as_user: Option<bool>,
2854    #[serde(default)]
2855    pub tools_format: YamlToolFormat,
2856    #[serde(default)]
2857    pub tools: Vec<YamlToolDeclaration>,
2858    pub tool_choice: Option<YamlToolChoiceConfig>,
2859    pub max_tool_roundtrips: Option<u8>,
2860    pub tool_calls_global_key: Option<String>,
2861}
2862
2863#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
2864#[serde(rename_all = "snake_case")]
2865pub enum YamlToolFormat {
2866    #[default]
2867    Openai,
2868    Simplified,
2869}
2870
2871#[derive(Debug, Clone, Deserialize)]
2872#[serde(untagged)]
2873pub enum YamlToolDeclaration {
2874    OpenAi(YamlOpenAiToolDeclaration),
2875    Simplified(YamlSimplifiedToolDeclaration),
2876}
2877
2878#[derive(Debug, Clone, Deserialize)]
2879pub struct YamlOpenAiToolDeclaration {
2880    #[serde(rename = "type")]
2881    pub tool_type: Option<ToolType>,
2882    pub function: YamlOpenAiToolFunction,
2883}
2884
2885#[derive(Debug, Clone, Deserialize)]
2886pub struct YamlOpenAiToolFunction {
2887    pub name: String,
2888    pub description: Option<String>,
2889    pub parameters: Option<Value>,
2890    pub output_schema: Option<Value>,
2891}
2892
2893#[derive(Debug, Clone, Deserialize)]
2894pub struct YamlSimplifiedToolDeclaration {
2895    pub name: String,
2896    pub description: Option<String>,
2897    pub input_schema: Value,
2898    pub output_schema: Option<Value>,
2899}
2900
2901#[derive(Debug, Clone, Deserialize)]
2902#[serde(untagged)]
2903pub enum YamlToolChoiceConfig {
2904    Mode(ToolChoiceMode),
2905    Function { function: String },
2906    OpenAi(ToolChoiceTool),
2907}
2908
2909#[derive(Debug, Clone, Deserialize)]
2910pub struct YamlSwitch {
2911    #[serde(default)]
2912    pub branches: Vec<YamlSwitchBranch>,
2913    pub default: String,
2914}
2915
2916#[derive(Debug, Clone, Deserialize)]
2917pub struct YamlSwitchBranch {
2918    pub condition: String,
2919    pub target: String,
2920}
2921
2922#[derive(Debug, Clone, Deserialize)]
2923#[serde(deny_unknown_fields)]
2924pub struct YamlCustomWorker {
2925    pub handler: String,
2926    pub handler_file: Option<String>,
2927}
2928
2929#[derive(Debug, Clone, Deserialize)]
2930pub struct YamlNodeConfig {
2931    pub prompt: Option<String>,
2932    #[serde(default, alias = "schema")]
2933    pub output_schema: Option<Value>,
2934    pub payload: Option<Value>,
2935    pub set_globals: Option<HashMap<String, String>>,
2936    pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
2937}
2938
2939#[derive(Debug, Clone, Deserialize)]
2940pub struct YamlGlobalUpdate {
2941    pub op: String,
2942    pub from: Option<String>,
2943    pub by: Option<f64>,
2944}
2945
2946#[derive(Debug, Clone, Deserialize)]
2947pub struct YamlEdge {
2948    pub from: String,
2949    pub to: String,
2950}
2951
2952#[cfg(test)]
2953mod tests {
2954    use super::*;
2955    use simple_agent_type::provider::{Provider, ProviderRequest, ProviderResponse};
2956    use simple_agent_type::response::{CompletionChoice, CompletionResponse, Usage};
2957    use simple_agent_type::tool::{ToolCallFunction, ToolType};
2958    use simple_agent_type::{Result as SaResult, SimpleAgentsError};
2959    use simple_agents_core::SimpleAgentsClientBuilder;
2960    use std::collections::BTreeMap;
2961    use std::fs;
2962    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2963    use std::sync::{Arc, Mutex, OnceLock};
2964
2965    fn stream_debug_env_lock() -> &'static Mutex<()> {
2966        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
2967        LOCK.get_or_init(|| Mutex::new(()))
2968    }
2969
2970    struct MockExecutor;
2971
2972    struct RecordingSink {
2973        events: Mutex<Vec<YamlWorkflowEvent>>,
2974    }
2975
2976    struct CancelAfterFirstEventSink {
2977        cancelled: AtomicBool,
2978    }
2979
2980    impl YamlWorkflowEventSink for CancelAfterFirstEventSink {
2981        fn emit(&self, _event: &YamlWorkflowEvent) {
2982            self.cancelled.store(true, Ordering::SeqCst);
2983        }
2984
2985        fn is_cancelled(&self) -> bool {
2986            self.cancelled.load(Ordering::SeqCst)
2987        }
2988    }
2989
2990    struct CountingExecutor {
2991        call_count: AtomicUsize,
2992    }
2993
2994    struct CapturingWorker {
2995        context: Mutex<Option<Value>>,
2996    }
2997
2998    struct HandlerFileCapturingWorker {
2999        handler_file: Mutex<Option<String>>,
3000    }
3001
3002    struct ToolLoopProvider;
3003
3004    struct UnknownToolProvider;
3005
3006    struct ReasoningUsageProvider;
3007
3008    struct ToolLoopReasoningProvider;
3009
3010    #[derive(Default)]
3011    struct CapturingSpan {
3012        attributes: BTreeMap<String, String>,
3013    }
3014
3015    impl WorkflowSpan for CapturingSpan {
3016        fn set_attribute(&mut self, key: &str, value: &str) {
3017            self.attributes.insert(key.to_string(), value.to_string());
3018        }
3019
3020        fn add_event(&mut self, _name: &str) {}
3021
3022        fn end(self: Box<Self>) {}
3023    }
3024
3025    #[async_trait]
3026    impl Provider for ToolLoopProvider {
3027        fn name(&self) -> &str {
3028            "openai"
3029        }
3030
3031        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
3032            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
3033            Ok(ProviderRequest::new("mock://tool-loop").with_body(body))
3034        }
3035
3036        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
3037            let request: CompletionRequest =
3038                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
3039
3040            let has_tools = request
3041                .tools
3042                .as_ref()
3043                .is_some_and(|tools| !tools.is_empty());
3044            let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
3045
3046            let response = if has_tools && !has_tool_result {
3047                CompletionResponse {
3048                    id: "resp_tool_1".to_string(),
3049                    model: request.model.clone(),
3050                    choices: vec![CompletionChoice {
3051                        index: 0,
3052                        message: Message::assistant("").with_tool_calls(vec![ToolCall {
3053                            id: "call_get_context".to_string(),
3054                            tool_type: ToolType::Function,
3055                            function: ToolCallFunction {
3056                                name: "get_customer_context".to_string(),
3057                                arguments: "{\"order_id\":\"123\"}".to_string(),
3058                            },
3059                        }]),
3060                        finish_reason: FinishReason::ToolCalls,
3061                        logprobs: None,
3062                    }],
3063                    usage: Usage::new(10, 5),
3064                    created: None,
3065                    provider: Some(self.name().to_string()),
3066                    healing_metadata: None,
3067                }
3068            } else if has_tools && has_tool_result {
3069                CompletionResponse {
3070                    id: "resp_tool_2".to_string(),
3071                    model: request.model.clone(),
3072                    choices: vec![CompletionChoice {
3073                        index: 0,
3074                        message: Message::assistant("{\"state\":\"done\"}"),
3075                        finish_reason: FinishReason::Stop,
3076                        logprobs: None,
3077                    }],
3078                    usage: Usage::new(12, 6),
3079                    created: None,
3080                    provider: Some(self.name().to_string()),
3081                    healing_metadata: None,
3082                }
3083            } else {
3084                let prompt = request
3085                    .messages
3086                    .iter()
3087                    .rev()
3088                    .find(|m| m.role == Role::User)
3089                    .map(|m| m.content.clone())
3090                    .unwrap_or_default();
3091                let payload = json!({
3092                    "subject": "ok",
3093                    "body": prompt,
3094                })
3095                .to_string();
3096                CompletionResponse {
3097                    id: "resp_final".to_string(),
3098                    model: request.model.clone(),
3099                    choices: vec![CompletionChoice {
3100                        index: 0,
3101                        message: Message::assistant(payload),
3102                        finish_reason: FinishReason::Stop,
3103                        logprobs: None,
3104                    }],
3105                    usage: Usage::new(8, 4),
3106                    created: None,
3107                    provider: Some(self.name().to_string()),
3108                    healing_metadata: None,
3109                }
3110            };
3111
3112            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
3113            Ok(ProviderResponse::new(200, body))
3114        }
3115
3116        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
3117            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
3118        }
3119    }
3120
3121    #[async_trait]
3122    impl Provider for UnknownToolProvider {
3123        fn name(&self) -> &str {
3124            "openai"
3125        }
3126
3127        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
3128            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
3129            Ok(ProviderRequest::new("mock://unknown-tool").with_body(body))
3130        }
3131
3132        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
3133            let request: CompletionRequest =
3134                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
3135
3136            let response = CompletionResponse {
3137                id: "resp_unknown_tool".to_string(),
3138                model: request.model,
3139                choices: vec![CompletionChoice {
3140                    index: 0,
3141                    message: Message::assistant("").with_tool_calls(vec![ToolCall {
3142                        id: "call_unknown".to_string(),
3143                        tool_type: ToolType::Function,
3144                        function: ToolCallFunction {
3145                            name: "unknown_tool".to_string(),
3146                            arguments: "{}".to_string(),
3147                        },
3148                    }]),
3149                    finish_reason: FinishReason::ToolCalls,
3150                    logprobs: None,
3151                }],
3152                usage: Usage::new(5, 2),
3153                created: None,
3154                provider: Some(self.name().to_string()),
3155                healing_metadata: None,
3156            };
3157
3158            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
3159            Ok(ProviderResponse::new(200, body))
3160        }
3161
3162        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
3163            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
3164        }
3165    }
3166
3167    #[async_trait]
3168    impl Provider for ReasoningUsageProvider {
3169        fn name(&self) -> &str {
3170            "openai"
3171        }
3172
3173        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
3174            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
3175            Ok(ProviderRequest::new("mock://reasoning-usage").with_body(body))
3176        }
3177
3178        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
3179            let request: CompletionRequest =
3180                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
3181
3182            let mut usage = Usage::new(9, 5);
3183            usage.reasoning_tokens = Some(4);
3184            let response = CompletionResponse {
3185                id: "resp_reasoning".to_string(),
3186                model: request.model,
3187                choices: vec![CompletionChoice {
3188                    index: 0,
3189                    message: Message::assistant("{\"state\":\"ok\"}"),
3190                    finish_reason: FinishReason::Stop,
3191                    logprobs: None,
3192                }],
3193                usage,
3194                created: None,
3195                provider: Some(self.name().to_string()),
3196                healing_metadata: None,
3197            };
3198
3199            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
3200            Ok(ProviderResponse::new(200, body))
3201        }
3202
3203        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
3204            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
3205        }
3206    }
3207
3208    #[async_trait]
3209    impl Provider for ToolLoopReasoningProvider {
3210        fn name(&self) -> &str {
3211            "openai"
3212        }
3213
3214        fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
3215            let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
3216            Ok(ProviderRequest::new("mock://tool-loop-reasoning").with_body(body))
3217        }
3218
3219        async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
3220            let request: CompletionRequest =
3221                serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
3222
3223            let has_tools = request
3224                .tools
3225                .as_ref()
3226                .is_some_and(|tools| !tools.is_empty());
3227            let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
3228
3229            let response = if has_tools && !has_tool_result {
3230                let mut usage = Usage::new(10, 5);
3231                usage.reasoning_tokens = Some(2);
3232                CompletionResponse {
3233                    id: "resp_tool_reasoning_1".to_string(),
3234                    model: request.model.clone(),
3235                    choices: vec![CompletionChoice {
3236                        index: 0,
3237                        message: Message::assistant("").with_tool_calls(vec![ToolCall {
3238                            id: "call_get_context".to_string(),
3239                            tool_type: ToolType::Function,
3240                            function: ToolCallFunction {
3241                                name: "get_customer_context".to_string(),
3242                                arguments: "{\"order_id\":\"123\"}".to_string(),
3243                            },
3244                        }]),
3245                        finish_reason: FinishReason::ToolCalls,
3246                        logprobs: None,
3247                    }],
3248                    usage,
3249                    created: None,
3250                    provider: Some(self.name().to_string()),
3251                    healing_metadata: None,
3252                }
3253            } else {
3254                let mut usage = Usage::new(12, 6);
3255                usage.reasoning_tokens = Some(3);
3256                CompletionResponse {
3257                    id: "resp_tool_reasoning_2".to_string(),
3258                    model: request.model,
3259                    choices: vec![CompletionChoice {
3260                        index: 0,
3261                        message: Message::assistant("{\"state\":\"done\"}"),
3262                        finish_reason: FinishReason::Stop,
3263                        logprobs: None,
3264                    }],
3265                    usage,
3266                    created: None,
3267                    provider: Some(self.name().to_string()),
3268                    healing_metadata: None,
3269                }
3270            };
3271
3272            let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
3273            Ok(ProviderResponse::new(200, body))
3274        }
3275
3276        fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
3277            serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
3278        }
3279    }
3280
3281    struct FixedToolWorker {
3282        payload: Value,
3283    }
3284
3285    struct CountingToolWorker {
3286        execute_calls: AtomicUsize,
3287    }
3288
3289    #[async_trait]
3290    impl YamlWorkflowCustomWorkerExecutor for FixedToolWorker {
3291        async fn execute(
3292            &self,
3293            _handler: &str,
3294            _handler_file: Option<&str>,
3295            _payload: &Value,
3296            _email_text: &str,
3297            _context: &Value,
3298        ) -> Result<Value, String> {
3299            Ok(self.payload.clone())
3300        }
3301    }
3302
3303    #[async_trait]
3304    impl YamlWorkflowCustomWorkerExecutor for CountingToolWorker {
3305        async fn execute(
3306            &self,
3307            _handler: &str,
3308            _handler_file: Option<&str>,
3309            _payload: &Value,
3310            _email_text: &str,
3311            _context: &Value,
3312        ) -> Result<Value, String> {
3313            self.execute_calls.fetch_add(1, Ordering::SeqCst);
3314            Ok(json!({"ok": true}))
3315        }
3316    }
3317
3318    #[async_trait]
3319    impl YamlWorkflowLlmExecutor for CountingExecutor {
3320        async fn complete_structured(
3321            &self,
3322            _request: YamlLlmExecutionRequest,
3323            _event_sink: Option<&dyn YamlWorkflowEventSink>,
3324        ) -> Result<YamlLlmExecutionResult, String> {
3325            self.call_count.fetch_add(1, Ordering::SeqCst);
3326            Ok(YamlLlmExecutionResult {
3327                payload: json!({"state":"ok"}),
3328                usage: None,
3329                ttft_ms: None,
3330                tool_calls: Vec::new(),
3331            })
3332        }
3333    }
3334
3335    impl YamlWorkflowEventSink for RecordingSink {
3336        fn emit(&self, event: &YamlWorkflowEvent) {
3337            self.events
3338                .lock()
3339                .expect("recording sink lock should not be poisoned")
3340                .push(event.clone());
3341        }
3342    }
3343
3344    #[async_trait]
3345    impl YamlWorkflowCustomWorkerExecutor for CapturingWorker {
3346        async fn execute(
3347            &self,
3348            _handler: &str,
3349            _handler_file: Option<&str>,
3350            _payload: &Value,
3351            _email_text: &str,
3352            context: &Value,
3353        ) -> Result<Value, String> {
3354            let mut guard = self
3355                .context
3356                .lock()
3357                .map_err(|_| "capturing worker lock should not be poisoned".to_string())?;
3358            *guard = Some(context.clone());
3359            Ok(json!({"ok": true}))
3360        }
3361    }
3362
3363    #[async_trait]
3364    impl YamlWorkflowCustomWorkerExecutor for HandlerFileCapturingWorker {
3365        async fn execute(
3366            &self,
3367            _handler: &str,
3368            handler_file: Option<&str>,
3369            _payload: &Value,
3370            _email_text: &str,
3371            _context: &Value,
3372        ) -> Result<Value, String> {
3373            let mut guard = self
3374                .handler_file
3375                .lock()
3376                .map_err(|_| "handler-file lock should not be poisoned".to_string())?;
3377            *guard = handler_file.map(ToString::to_string);
3378            Ok(json!({"ok": true}))
3379        }
3380    }
3381
3382    #[async_trait]
3383    impl YamlWorkflowLlmExecutor for MockExecutor {
3384        async fn complete_structured(
3385            &self,
3386            request: YamlLlmExecutionRequest,
3387            _event_sink: Option<&dyn YamlWorkflowEventSink>,
3388        ) -> Result<YamlLlmExecutionResult, String> {
3389            let prompt = request.prompt;
3390            if prompt.contains("exactly one category") {
3391                return Ok(YamlLlmExecutionResult {
3392                    payload: json!({"category":"termination","reason":"mock"}),
3393                    usage: Some(YamlLlmTokenUsage {
3394                        prompt_tokens: 10,
3395                        completion_tokens: 5,
3396                        total_tokens: 15,
3397                        reasoning_tokens: None,
3398                    }),
3399                    ttft_ms: None,
3400                    tool_calls: Vec::new(),
3401                });
3402            }
3403            if prompt.contains("Determine termination subtype") {
3404                return Ok(YamlLlmExecutionResult {
3405                    payload: json!({"subtype":"repeated_offense","reason":"mock"}),
3406                    usage: Some(YamlLlmTokenUsage {
3407                        prompt_tokens: 12,
3408                        completion_tokens: 6,
3409                        total_tokens: 18,
3410                        reasoning_tokens: None,
3411                    }),
3412                    ttft_ms: None,
3413                    tool_calls: Vec::new(),
3414                });
3415            }
3416            if prompt.contains("Determine supply chain subtype") {
3417                return Ok(YamlLlmExecutionResult {
3418                    payload: json!({"subtype":"order_replacement","reason":"mock"}),
3419                    usage: Some(YamlLlmTokenUsage {
3420                        prompt_tokens: 11,
3421                        completion_tokens: 4,
3422                        total_tokens: 15,
3423                        reasoning_tokens: None,
3424                    }),
3425                    ttft_ms: None,
3426                    tool_calls: Vec::new(),
3427                });
3428            }
3429            Err("unexpected prompt".to_string())
3430        }
3431    }
3432
3433    #[tokio::test]
3434    async fn runs_yaml_workflow_and_returns_step_timings() {
3435        let yaml = r#"
3436id: email-intake-classification
3437entry_node: classify_top_level
3438nodes:
3439  - id: classify_top_level
3440    node_type:
3441      llm_call:
3442        model: gpt-4.1
3443    config:
3444      prompt: |
3445        Classify this email into exactly one category:
3446        {{ input.email_text }}
3447  - id: route_top_level
3448    node_type:
3449      switch:
3450        branches:
3451          - condition: '$.nodes.classify_top_level.output.category == "termination"'
3452            target: classify_termination_subtype
3453        default: rag_clarification
3454  - id: classify_termination_subtype
3455    node_type:
3456      llm_call:
3457        model: gpt-4.1
3458    config:
3459      prompt: |
3460        Determine termination subtype:
3461        {{ input.email_text }}
3462  - id: route_termination_subtype
3463    node_type:
3464      switch:
3465        branches:
3466          - condition: '$.nodes.classify_termination_subtype.output.subtype == "repeated_offense"'
3467            target: rag_termination_repeated_offense
3468        default: rag_clarification
3469  - id: rag_termination_repeated_offense
3470    node_type:
3471      custom_worker:
3472        handler: GetRagData
3473    config:
3474      payload:
3475        topic: termination_repeated_offense
3476  - id: rag_clarification
3477    node_type:
3478      custom_worker:
3479        handler: GetRagData
3480    config:
3481      payload:
3482        topic: clarification
3483edges:
3484  - from: classify_top_level
3485    to: route_top_level
3486  - from: classify_termination_subtype
3487    to: route_termination_subtype
3488"#;
3489
3490        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3491        let custom_worker = FixedToolWorker {
3492            payload: json!({"context": "mock"}),
3493        };
3494        let output = run_email_workflow_yaml_with_custom_worker(
3495            &workflow,
3496            "test",
3497            &MockExecutor,
3498            Some(&custom_worker),
3499        )
3500        .await
3501        .expect("yaml workflow should execute");
3502
3503        assert_eq!(output.workflow_id, "email-intake-classification");
3504        assert_eq!(output.terminal_node, "rag_termination_repeated_offense");
3505        assert!(!output.step_timings.is_empty());
3506        assert_eq!(output.step_timings.len(), output.trace.len());
3507        assert!(output
3508            .outputs
3509            .contains_key("rag_termination_repeated_offense"));
3510        assert_eq!(output.total_input_tokens, 22);
3511        assert_eq!(output.total_output_tokens, 11);
3512        assert_eq!(output.total_tokens, 33);
3513    }
3514
3515    #[tokio::test]
3516    async fn workflow_runner_executes_inline_workflow_with_executor() {
3517        let yaml = r#"
3518id: email-intake-classification
3519entry_node: classify_top_level
3520nodes:
3521  - id: classify_top_level
3522    node_type:
3523      llm_call:
3524        model: gpt-4.1
3525    config:
3526      prompt: |
3527        Classify this email into exactly one category:
3528        {{ input.email_text }}
3529"#;
3530
3531        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3532        let output = WorkflowRunner::from_workflow(&workflow)
3533            .with_email_text("test")
3534            .with_executor(&MockExecutor)
3535            .run()
3536            .await
3537            .expect("builder execution should succeed");
3538
3539        assert_eq!(output.workflow_id, "email-intake-classification");
3540        assert!(output.outputs.contains_key("classify_top_level"));
3541    }
3542
3543    #[tokio::test]
3544    async fn workflow_runner_requires_executor() {
3545        let yaml = r#"
3546id: missing-executor
3547entry_node: end
3548nodes:
3549  - id: end
3550    node_type:
3551      end: {}
3552"#;
3553        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3554
3555        let err = WorkflowRunner::from_workflow(&workflow)
3556            .with_input(&json!({"email_text":"x"}))
3557            .run()
3558            .await
3559            .expect_err("missing executor should fail");
3560
3561        assert!(matches!(
3562            err,
3563            YamlWorkflowRunError::InvalidInput { message }
3564                if message.contains("workflow executor is required")
3565        ));
3566    }
3567
3568    #[tokio::test]
3569    async fn emits_resolved_llm_input_event_with_bindings() {
3570        let yaml = r#"
3571id: email-intake-classification
3572entry_node: classify_top_level
3573nodes:
3574  - id: classify_top_level
3575    node_type:
3576      llm_call:
3577        model: gpt-4.1
3578    config:
3579      prompt: |
3580        Classify this email into exactly one category:
3581        {{ input.email_text }}
3582"#;
3583
3584        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3585        let sink = RecordingSink {
3586            events: Mutex::new(Vec::new()),
3587        };
3588
3589        let output = run_email_workflow_yaml_with_custom_worker_and_events(
3590            &workflow,
3591            "Need help with termination",
3592            &MockExecutor,
3593            None,
3594            Some(&sink),
3595        )
3596        .await
3597        .expect("yaml workflow should execute");
3598
3599        assert_eq!(output.terminal_node, "classify_top_level");
3600
3601        let events = sink
3602            .events
3603            .lock()
3604            .expect("recording sink lock should not be poisoned");
3605        let llm_event = events
3606            .iter()
3607            .find(|event| event.event_type == "node_llm_input_resolved")
3608            .expect("expected llm input telemetry event");
3609
3610        let metadata = llm_event
3611            .metadata
3612            .as_ref()
3613            .expect("llm input event must include metadata");
3614        assert_eq!(metadata["model"], Value::String("gpt-4.1".to_string()));
3615        assert_eq!(metadata["stream_requested"], Value::Bool(false));
3616        assert_eq!(metadata["heal_requested"], Value::Bool(false));
3617        assert!(metadata["prompt"]
3618            .as_str()
3619            .expect("prompt should be a string")
3620            .contains("Need help with termination"));
3621
3622        let bindings = metadata["bindings"]
3623            .as_array()
3624            .expect("bindings should be an array");
3625        assert_eq!(bindings.len(), 1);
3626        assert_eq!(
3627            bindings[0]["source_path"],
3628            Value::String("input.email_text".to_string())
3629        );
3630        assert_eq!(
3631            bindings[0]["resolved"],
3632            Value::String("Need help with termination".to_string())
3633        );
3634        assert_eq!(bindings[0]["missing"], Value::Bool(false));
3635        assert_eq!(
3636            bindings[0]["resolved_type"],
3637            Value::String("string".to_string())
3638        );
3639    }
3640
3641    #[tokio::test]
3642    async fn workflow_completed_event_includes_nerdstats_by_default() {
3643        let yaml = r#"
3644id: nerdstats-default
3645entry_node: classify
3646nodes:
3647  - id: classify
3648    node_type:
3649      llm_call:
3650        model: gpt-4.1
3651    config:
3652      prompt: |
3653        Classify this email into exactly one category:
3654        {{ input.email_text }}
3655"#;
3656
3657        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3658        let sink = RecordingSink {
3659            events: Mutex::new(Vec::new()),
3660        };
3661
3662        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
3663            &workflow,
3664            &json!({"email_text":"hello"}),
3665            &MockExecutor,
3666            None,
3667            Some(&sink),
3668            &YamlWorkflowRunOptions::default(),
3669        )
3670        .await
3671        .expect("workflow should execute");
3672
3673        let events = sink
3674            .events
3675            .lock()
3676            .expect("recording sink lock should not be poisoned");
3677        let completed = events
3678            .iter()
3679            .find(|event| event.event_type == "workflow_completed")
3680            .expect("expected workflow_completed event");
3681        let metadata = completed
3682            .metadata
3683            .as_ref()
3684            .expect("workflow_completed should include metadata by default");
3685        let nerdstats = metadata
3686            .get("nerdstats")
3687            .expect("nerdstats should be present by default");
3688
3689        assert_eq!(nerdstats["workflow_id"], Value::String(output.workflow_id));
3690        assert_eq!(
3691            nerdstats["terminal_node"],
3692            Value::String(output.terminal_node)
3693        );
3694        assert_eq!(
3695            nerdstats["total_tokens"],
3696            Value::Number(output.total_tokens.into())
3697        );
3698        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
3699        assert_eq!(
3700            nerdstats["token_metrics_source"],
3701            Value::String("provider_usage".to_string())
3702        );
3703        assert!(nerdstats.get("step_timings").is_none());
3704        assert!(nerdstats.get("llm_node_metrics").is_none());
3705        assert!(nerdstats.get("step_details").is_some());
3706        assert!(nerdstats.get("llm_node_models").is_none());
3707        assert_eq!(
3708            nerdstats["step_details"][0]["model_name"],
3709            Value::String("gpt-4.1".to_string())
3710        );
3711        assert_eq!(
3712            nerdstats["step_details"][0]["node_id"],
3713            Value::String("classify".to_string())
3714        );
3715        assert_eq!(nerdstats["ttft_ms"], Value::Null);
3716    }
3717
3718    #[tokio::test]
3719    async fn workflow_completed_event_omits_nerdstats_when_disabled() {
3720        let yaml = r#"
3721id: nerdstats-disabled
3722entry_node: classify
3723nodes:
3724  - id: classify
3725    node_type:
3726      llm_call:
3727        model: gpt-4.1
3728    config:
3729      prompt: |
3730        Classify this email into exactly one category:
3731        {{ input.email_text }}
3732"#;
3733
3734        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3735        let sink = RecordingSink {
3736            events: Mutex::new(Vec::new()),
3737        };
3738        let options = YamlWorkflowRunOptions {
3739            telemetry: YamlWorkflowTelemetryConfig {
3740                nerdstats: false,
3741                ..YamlWorkflowTelemetryConfig::default()
3742            },
3743            ..YamlWorkflowRunOptions::default()
3744        };
3745
3746        run_workflow_yaml_with_custom_worker_and_events_and_options(
3747            &workflow,
3748            &json!({"email_text":"hello"}),
3749            &MockExecutor,
3750            None,
3751            Some(&sink),
3752            &options,
3753        )
3754        .await
3755        .expect("workflow should execute");
3756
3757        let events = sink
3758            .events
3759            .lock()
3760            .expect("recording sink lock should not be poisoned");
3761        let completed = events
3762            .iter()
3763            .find(|event| event.event_type == "workflow_completed")
3764            .expect("expected workflow_completed event");
3765        assert!(completed.metadata.is_none());
3766    }
3767
3768    struct StreamAwareMockExecutor;
3769
3770    #[async_trait]
3771    impl YamlWorkflowLlmExecutor for StreamAwareMockExecutor {
3772        async fn complete_structured(
3773            &self,
3774            request: YamlLlmExecutionRequest,
3775            _event_sink: Option<&dyn YamlWorkflowEventSink>,
3776        ) -> Result<YamlLlmExecutionResult, String> {
3777            Ok(YamlLlmExecutionResult {
3778                payload: json!({"state":"ok"}),
3779                usage: Some(YamlLlmTokenUsage {
3780                    prompt_tokens: 20,
3781                    completion_tokens: 10,
3782                    total_tokens: 30,
3783                    reasoning_tokens: None,
3784                }),
3785                ttft_ms: if request.stream { Some(12) } else { None },
3786                tool_calls: Vec::new(),
3787            })
3788        }
3789    }
3790
3791    #[tokio::test]
3792    async fn workflow_completed_event_includes_nerdstats_for_streaming_nodes() {
3793        let yaml = r#"
3794id: nerdstats-streaming
3795entry_node: classify
3796nodes:
3797  - id: classify
3798    node_type:
3799      llm_call:
3800        model: gpt-4.1
3801        stream: true
3802    config:
3803      prompt: |
3804        Return JSON only:
3805        {"state":"ok"}
3806"#;
3807
3808        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3809        let sink = RecordingSink {
3810            events: Mutex::new(Vec::new()),
3811        };
3812
3813        run_workflow_yaml_with_custom_worker_and_events_and_options(
3814            &workflow,
3815            &json!({"email_text":"hello"}),
3816            &StreamAwareMockExecutor,
3817            None,
3818            Some(&sink),
3819            &YamlWorkflowRunOptions::default(),
3820        )
3821        .await
3822        .expect("workflow should execute");
3823
3824        let events = sink
3825            .events
3826            .lock()
3827            .expect("recording sink lock should not be poisoned");
3828        let completed = events
3829            .iter()
3830            .find(|event| event.event_type == "workflow_completed")
3831            .expect("expected workflow_completed event");
3832        let metadata = completed
3833            .metadata
3834            .as_ref()
3835            .expect("workflow_completed should include metadata by default");
3836        let nerdstats = metadata
3837            .get("nerdstats")
3838            .expect("nerdstats should be present by default");
3839
3840        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
3841        assert_eq!(nerdstats["total_tokens"], Value::Number(30u64.into()));
3842        assert_eq!(nerdstats["ttft_ms"], Value::Number(12u64.into()));
3843        assert!(nerdstats.get("step_timings").is_none());
3844        assert!(nerdstats.get("llm_node_metrics").is_none());
3845        assert_eq!(
3846            nerdstats["step_details"][0]["model_name"],
3847            Value::String("gpt-4.1".to_string())
3848        );
3849        assert_eq!(
3850            nerdstats["step_details"][0]["node_id"],
3851            Value::String("classify".to_string())
3852        );
3853    }
3854
3855    #[test]
3856    fn workflow_nerdstats_marks_stream_token_metrics_unavailable() {
3857        let output = YamlWorkflowRunOutput {
3858            workflow_id: "workflow".to_string(),
3859            entry_node: "start".to_string(),
3860            email_text: "hello".to_string(),
3861            trace: vec!["llm_node".to_string()],
3862            outputs: BTreeMap::new(),
3863            terminal_node: "llm_node".to_string(),
3864            terminal_output: None,
3865            step_timings: vec![YamlStepTiming {
3866                node_id: "llm_node".to_string(),
3867                node_kind: "llm_call".to_string(),
3868                model_name: Some("gpt-4.1".to_string()),
3869                elapsed_ms: 100,
3870                prompt_tokens: None,
3871                completion_tokens: None,
3872                total_tokens: None,
3873                reasoning_tokens: None,
3874                tokens_per_second: None,
3875            }],
3876            llm_node_metrics: BTreeMap::new(),
3877            llm_node_models: BTreeMap::new(),
3878            total_elapsed_ms: 100,
3879            ttft_ms: None,
3880            total_input_tokens: 0,
3881            total_output_tokens: 0,
3882            total_tokens: 0,
3883            total_reasoning_tokens: None,
3884            tokens_per_second: 0.0,
3885            trace_id: Some("trace-1".to_string()),
3886            metadata: None,
3887        };
3888
3889        let nerdstats = workflow_nerdstats(&output);
3890        assert_eq!(nerdstats["token_metrics_available"], Value::Bool(false));
3891        assert_eq!(
3892            nerdstats["token_metrics_source"],
3893            Value::String("provider_stream_usage_unavailable".to_string())
3894        );
3895        assert_eq!(nerdstats["total_tokens"], Value::Null);
3896        assert_eq!(nerdstats["ttft_ms"], Value::Null);
3897        assert!(nerdstats.get("step_timings").is_none());
3898        assert!(nerdstats.get("llm_node_metrics").is_none());
3899        assert_eq!(
3900            nerdstats["step_details"][0]["node_id"],
3901            Value::String("llm_node".to_string())
3902        );
3903        assert_eq!(
3904            nerdstats["step_details"][0]["model_name"],
3905            Value::String("gpt-4.1".to_string())
3906        );
3907    }
3908
3909    #[test]
3910    fn workflow_nerdstats_includes_ttft_when_available() {
3911        let output = YamlWorkflowRunOutput {
3912            workflow_id: "workflow".to_string(),
3913            entry_node: "start".to_string(),
3914            email_text: "hello".to_string(),
3915            trace: vec!["llm_node".to_string()],
3916            outputs: BTreeMap::new(),
3917            terminal_node: "llm_node".to_string(),
3918            terminal_output: None,
3919            step_timings: vec![YamlStepTiming {
3920                node_id: "llm_node".to_string(),
3921                node_kind: "llm_call".to_string(),
3922                model_name: Some("gpt-4.1".to_string()),
3923                elapsed_ms: 100,
3924                prompt_tokens: Some(10),
3925                completion_tokens: Some(15),
3926                total_tokens: Some(25),
3927                reasoning_tokens: None,
3928                tokens_per_second: Some(150.0),
3929            }],
3930            llm_node_metrics: BTreeMap::new(),
3931            llm_node_models: BTreeMap::new(),
3932            total_elapsed_ms: 100,
3933            ttft_ms: Some(42),
3934            total_input_tokens: 10,
3935            total_output_tokens: 15,
3936            total_tokens: 25,
3937            total_reasoning_tokens: None,
3938            tokens_per_second: 150.0,
3939            trace_id: Some("trace-2".to_string()),
3940            metadata: None,
3941        };
3942
3943        let nerdstats = workflow_nerdstats(&output);
3944        assert_eq!(nerdstats["ttft_ms"], Value::Number(42u64.into()));
3945        assert!(nerdstats.get("step_timings").is_none());
3946        assert!(nerdstats.get("llm_node_metrics").is_none());
3947        assert_eq!(
3948            nerdstats["step_details"][0]["node_id"],
3949            Value::String("llm_node".to_string())
3950        );
3951        assert_eq!(
3952            nerdstats["step_details"][0]["model_name"],
3953            Value::String("gpt-4.1".to_string())
3954        );
3955    }
3956
3957    #[test]
3958    fn workflow_nerdstats_schema_contract_is_stable() {
3959        let output = YamlWorkflowRunOutput {
3960            workflow_id: "schema-workflow".to_string(),
3961            entry_node: "start".to_string(),
3962            email_text: "hello".to_string(),
3963            trace: vec!["classify".to_string(), "route".to_string()],
3964            outputs: BTreeMap::new(),
3965            terminal_node: "route".to_string(),
3966            terminal_output: None,
3967            step_timings: vec![
3968                YamlStepTiming {
3969                    node_id: "classify".to_string(),
3970                    node_kind: "llm_call".to_string(),
3971                    model_name: Some("gpt-4.1".to_string()),
3972                    elapsed_ms: 100,
3973                    prompt_tokens: Some(11),
3974                    completion_tokens: Some(22),
3975                    total_tokens: Some(33),
3976                    reasoning_tokens: Some(7),
3977                    tokens_per_second: Some(220.0),
3978                },
3979                YamlStepTiming {
3980                    node_id: "route".to_string(),
3981                    node_kind: "switch".to_string(),
3982                    model_name: None,
3983                    elapsed_ms: 0,
3984                    prompt_tokens: None,
3985                    completion_tokens: None,
3986                    total_tokens: None,
3987                    reasoning_tokens: None,
3988                    tokens_per_second: None,
3989                },
3990            ],
3991            llm_node_metrics: BTreeMap::new(),
3992            llm_node_models: BTreeMap::new(),
3993            total_elapsed_ms: 100,
3994            ttft_ms: Some(9),
3995            total_input_tokens: 11,
3996            total_output_tokens: 22,
3997            total_tokens: 33,
3998            total_reasoning_tokens: Some(7),
3999            tokens_per_second: 220.0,
4000            trace_id: Some("trace-schema".to_string()),
4001            metadata: None,
4002        };
4003
4004        let nerdstats = workflow_nerdstats(&output);
4005        let expected = json!({
4006            "workflow_id": "schema-workflow",
4007            "terminal_node": "route",
4008            "total_elapsed_ms": 100,
4009            "ttft_ms": 9,
4010            "step_details": [
4011                {
4012                    "node_id": "classify",
4013                    "node_kind": "llm_call",
4014                    "model_name": "gpt-4.1",
4015                    "elapsed_ms": 100,
4016                    "prompt_tokens": 11,
4017                    "completion_tokens": 22,
4018                    "total_tokens": 33,
4019                    "reasoning_tokens": 7,
4020                    "tokens_per_second": 220.0
4021                },
4022                {
4023                    "node_id": "route",
4024                    "node_kind": "switch",
4025                    "elapsed_ms": 0
4026                }
4027            ],
4028            "total_input_tokens": 11,
4029            "total_output_tokens": 22,
4030            "total_tokens": 33,
4031            "total_reasoning_tokens": 7,
4032            "tokens_per_second": 220.0,
4033            "trace_id": "trace-schema",
4034            "token_metrics_available": true,
4035            "token_metrics_source": "provider_usage",
4036            "llm_nodes_without_usage": []
4037        });
4038
4039        assert_eq!(nerdstats, expected);
4040    }
4041
4042    struct MessageHistoryExecutor;
4043
4044    #[async_trait]
4045    impl YamlWorkflowLlmExecutor for MessageHistoryExecutor {
4046        async fn complete_structured(
4047            &self,
4048            request: YamlLlmExecutionRequest,
4049            _event_sink: Option<&dyn YamlWorkflowEventSink>,
4050        ) -> Result<YamlLlmExecutionResult, String> {
4051            let messages = request
4052                .messages
4053                .ok_or_else(|| "expected messages in request".to_string())?;
4054            if messages.len() != 2 {
4055                return Err(format!("expected 2 messages, got {}", messages.len()));
4056            }
4057            Ok(YamlLlmExecutionResult {
4058                payload: json!({"category":"termination","reason":"history"}),
4059                usage: Some(YamlLlmTokenUsage {
4060                    prompt_tokens: 7,
4061                    completion_tokens: 3,
4062                    total_tokens: 10,
4063                    reasoning_tokens: None,
4064                }),
4065                ttft_ms: None,
4066                tool_calls: Vec::new(),
4067            })
4068        }
4069    }
4070
4071    #[tokio::test]
4072    async fn supports_messages_path_in_workflow_input() {
4073        let yaml = r#"
4074id: email-intake-classification
4075entry_node: classify_top_level
4076nodes:
4077  - id: classify_top_level
4078    node_type:
4079      llm_call:
4080        model: gpt-4.1
4081        messages_path: input.messages
4082        append_prompt_as_user: false
4083"#;
4084
4085        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4086        let input = json!({
4087            "email_text": "ignored",
4088            "messages": [
4089                {"role": "system", "content": "You are a classifier"},
4090                {"role": "user", "content": "Please classify this"}
4091            ]
4092        });
4093
4094        let output = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
4095            .await
4096            .expect("workflow should use chat history from input");
4097
4098        assert_eq!(output.terminal_node, "classify_top_level");
4099        assert_eq!(
4100            output.outputs["classify_top_level"]["output"]["reason"],
4101            Value::String("history".to_string())
4102        );
4103    }
4104
4105    #[tokio::test]
4106    async fn wrapper_entrypoints_produce_equivalent_outputs() {
4107        let yaml = r#"
4108id: wrapper-equivalence
4109entry_node: classify
4110nodes:
4111  - id: classify
4112    node_type:
4113      llm_call:
4114        model: gpt-4.1
4115    config:
4116      prompt: |
4117        Classify this email into exactly one category:
4118        {{ input.email_text }}
4119"#;
4120
4121        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4122        let input = json!({"email_text":"hello"});
4123
4124        let a = run_workflow_yaml(&workflow, &input, &MockExecutor)
4125            .await
4126            .expect("base entrypoint should execute");
4127        let b = run_workflow_yaml_with_custom_worker(&workflow, &input, &MockExecutor, None)
4128            .await
4129            .expect("custom worker wrapper should execute");
4130        let c = run_workflow_yaml_with_custom_worker_and_events_and_options(
4131            &workflow,
4132            &input,
4133            &MockExecutor,
4134            None,
4135            None,
4136            &YamlWorkflowRunOptions::default(),
4137        )
4138        .await
4139        .expect("events/options wrapper should execute");
4140
4141        assert_eq!(a.workflow_id, b.workflow_id);
4142        assert_eq!(a.workflow_id, c.workflow_id);
4143        assert_eq!(a.terminal_node, b.terminal_node);
4144        assert_eq!(a.terminal_node, c.terminal_node);
4145        assert_eq!(a.outputs, b.outputs);
4146        assert_eq!(a.outputs, c.outputs);
4147        assert_eq!(a.total_tokens, b.total_tokens);
4148        assert_eq!(a.total_tokens, c.total_tokens);
4149    }
4150
4151    #[tokio::test]
4152    async fn yaml_llm_tool_calling_captures_traces_and_supports_globals_reference() {
4153        let yaml = r#"
4154id: tool-calling-workflow
4155entry_node: generate_with_tool
4156nodes:
4157  - id: generate_with_tool
4158    node_type:
4159      llm_call:
4160        model: gpt-4.1
4161        tools_format: simplified
4162        max_tool_roundtrips: 1
4163        tool_calls_global_key: audit
4164        tools:
4165          - name: get_customer_context
4166            description: Fetch customer context
4167            input_schema:
4168              type: object
4169              properties:
4170                order_id: { type: string }
4171              required: [order_id]
4172              additionalProperties: false
4173            output_schema:
4174              type: object
4175              properties:
4176                customer_name: { type: string }
4177              required: [customer_name]
4178              additionalProperties: false
4179    config:
4180      output_schema:
4181        type: object
4182        properties:
4183          state: { type: string }
4184        required: [state]
4185  - id: personalize
4186    node_type:
4187      llm_call:
4188        model: gpt-4.1
4189    config:
4190      prompt: |
4191        Write an email greeting for {{ globals.audit.0.output.customer_name }}.
4192      output_schema:
4193        type: object
4194        properties:
4195          subject: { type: string }
4196          body: { type: string }
4197        required: [subject, body]
4198edges:
4199  - from: generate_with_tool
4200    to: personalize
4201"#;
4202
4203        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4204        let client = SimpleAgentsClientBuilder::new()
4205            .with_provider(Arc::new(ToolLoopProvider))
4206            .build()
4207            .expect("client should build");
4208        let worker = FixedToolWorker {
4209            payload: json!({"customer_name": "Ava"}),
4210        };
4211
4212        let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4213            &workflow,
4214            &json!({"email_text":"hello"}),
4215            &client,
4216            Some(&worker),
4217            None,
4218            &YamlWorkflowRunOptions::default(),
4219        )
4220        .await
4221        .expect("workflow should execute");
4222
4223        assert_eq!(output.trace, vec!["generate_with_tool", "personalize"]);
4224        assert_eq!(
4225            output.outputs["generate_with_tool"]["tool_calls"][0]["output"]["customer_name"],
4226            Value::String("Ava".to_string())
4227        );
4228        let body = output.outputs["personalize"]["output"]["body"]
4229            .as_str()
4230            .expect("body should be string");
4231        assert!(body.contains("Ava"));
4232    }
4233
4234    #[tokio::test]
4235    async fn workflow_with_client_preserves_reasoning_tokens_in_output_and_nerdstats() {
4236        let yaml = r#"
4237id: reasoning-usage-workflow
4238entry_node: classify
4239nodes:
4240  - id: classify
4241    node_type:
4242      llm_call:
4243        model: gpt-4.1
4244    config:
4245      prompt: |
4246        Return JSON only:
4247        {"state":"ok"}
4248      output_schema:
4249        type: object
4250        properties:
4251          state: { type: string }
4252        required: [state]
4253"#;
4254
4255        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4256        let client = SimpleAgentsClientBuilder::new()
4257            .with_provider(Arc::new(ReasoningUsageProvider))
4258            .build()
4259            .expect("client should build");
4260
4261        let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4262            &workflow,
4263            &json!({"email_text":"hello"}),
4264            &client,
4265            None,
4266            None,
4267            &YamlWorkflowRunOptions::default(),
4268        )
4269        .await
4270        .expect("workflow should execute");
4271
4272        assert_eq!(output.total_reasoning_tokens, Some(4));
4273        assert_eq!(output.step_timings.len(), 1);
4274        assert_eq!(output.step_timings[0].reasoning_tokens, Some(4));
4275
4276        let nerdstats = workflow_nerdstats(&output);
4277        assert_eq!(
4278            nerdstats["total_reasoning_tokens"],
4279            Value::Number(4u64.into())
4280        );
4281        assert_eq!(
4282            nerdstats["step_details"][0]["reasoning_tokens"],
4283            Value::Number(4u64.into())
4284        );
4285    }
4286
4287    #[tokio::test]
4288    async fn workflow_with_tools_accumulates_reasoning_tokens_across_roundtrips() {
4289        let yaml = r#"
4290id: tool-reasoning-workflow
4291entry_node: generate_with_tool
4292nodes:
4293  - id: generate_with_tool
4294    node_type:
4295      llm_call:
4296        model: gpt-4.1
4297        tools_format: simplified
4298        max_tool_roundtrips: 1
4299        tools:
4300          - name: get_customer_context
4301            input_schema:
4302              type: object
4303              properties:
4304                order_id: { type: string }
4305              required: [order_id]
4306              additionalProperties: false
4307            output_schema:
4308              type: object
4309              properties:
4310                customer_name: { type: string }
4311              required: [customer_name]
4312              additionalProperties: false
4313    config:
4314      output_schema:
4315        type: object
4316        properties:
4317          state: { type: string }
4318        required: [state]
4319"#;
4320
4321        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4322        let client = SimpleAgentsClientBuilder::new()
4323            .with_provider(Arc::new(ToolLoopReasoningProvider))
4324            .build()
4325            .expect("client should build");
4326        let worker = FixedToolWorker {
4327            payload: json!({"customer_name": "Ava"}),
4328        };
4329
4330        let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4331            &workflow,
4332            &json!({"email_text":"hello"}),
4333            &client,
4334            Some(&worker),
4335            None,
4336            &YamlWorkflowRunOptions::default(),
4337        )
4338        .await
4339        .expect("workflow should execute");
4340
4341        assert_eq!(output.total_reasoning_tokens, Some(5));
4342        assert_eq!(output.step_timings.len(), 1);
4343        assert_eq!(output.step_timings[0].reasoning_tokens, Some(5));
4344
4345        let nerdstats = workflow_nerdstats(&output);
4346        assert_eq!(
4347            nerdstats["total_reasoning_tokens"],
4348            Value::Number(5u64.into())
4349        );
4350        assert_eq!(
4351            nerdstats["step_details"][0]["reasoning_tokens"],
4352            Value::Number(5u64.into())
4353        );
4354    }
4355
4356    #[tokio::test]
4357    async fn yaml_llm_tool_output_schema_mismatch_hard_fails_node() {
4358        let yaml = r#"
4359id: tool-calling-schema-fail
4360entry_node: generate_with_tool
4361nodes:
4362  - id: generate_with_tool
4363    node_type:
4364      llm_call:
4365        model: gpt-4.1
4366        tools_format: simplified
4367        max_tool_roundtrips: 1
4368        tools:
4369          - name: get_customer_context
4370            input_schema:
4371              type: object
4372              properties:
4373                order_id: { type: string }
4374              required: [order_id]
4375              additionalProperties: false
4376            output_schema:
4377              type: object
4378              properties:
4379                customer_name: { type: string }
4380              required: [customer_name]
4381              additionalProperties: false
4382    config:
4383      output_schema:
4384        type: object
4385        properties:
4386          state: { type: string }
4387        required: [state]
4388"#;
4389
4390        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4391        let client = SimpleAgentsClientBuilder::new()
4392            .with_provider(Arc::new(ToolLoopProvider))
4393            .build()
4394            .expect("client should build");
4395        let worker = FixedToolWorker {
4396            payload: json!({"unexpected": "shape"}),
4397        };
4398
4399        let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4400            &workflow,
4401            &json!({"email_text":"hello"}),
4402            &client,
4403            Some(&worker),
4404            None,
4405            &YamlWorkflowRunOptions::default(),
4406        )
4407        .await
4408        .expect_err("workflow should hard-fail on schema mismatch");
4409
4410        match error {
4411            YamlWorkflowRunError::Llm { message, .. } => {
4412                assert!(message.contains("output failed schema validation"));
4413            }
4414            other => panic!("expected llm error, got {other:?}"),
4415        }
4416    }
4417
4418    #[tokio::test]
4419    async fn yaml_llm_unknown_tool_is_rejected_before_custom_worker_execution() {
4420        let yaml = r#"
4421id: unknown-tool-rejected
4422entry_node: generate_with_tool
4423nodes:
4424  - id: generate_with_tool
4425    node_type:
4426      llm_call:
4427        model: gpt-4.1
4428        tools_format: simplified
4429        max_tool_roundtrips: 1
4430        tools:
4431          - name: get_customer_context
4432            input_schema:
4433              type: object
4434              properties:
4435                order_id: { type: string }
4436              required: [order_id]
4437    config:
4438      output_schema:
4439        type: object
4440        properties:
4441          state: { type: string }
4442        required: [state]
4443"#;
4444
4445        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4446        let client = SimpleAgentsClientBuilder::new()
4447            .with_provider(Arc::new(UnknownToolProvider))
4448            .build()
4449            .expect("client should build");
4450        let worker = CountingToolWorker {
4451            execute_calls: AtomicUsize::new(0),
4452        };
4453
4454        let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4455            &workflow,
4456            &json!({"email_text":"hello"}),
4457            &client,
4458            Some(&worker),
4459            None,
4460            &YamlWorkflowRunOptions::default(),
4461        )
4462        .await
4463        .expect_err("workflow should reject unknown tool before executing worker");
4464
4465        match error {
4466            YamlWorkflowRunError::Llm { message, .. } => {
4467                assert!(message.contains("model requested unknown tool 'unknown_tool'"));
4468            }
4469            other => panic!("expected llm error, got {other:?}"),
4470        }
4471
4472        assert_eq!(worker.execute_calls.load(Ordering::SeqCst), 0);
4473    }
4474
4475    #[test]
4476    fn validates_tools_format_mismatch() {
4477        let yaml = r#"
4478id: mismatch
4479entry_node: generate
4480nodes:
4481  - id: generate
4482    node_type:
4483      llm_call:
4484        model: gpt-4.1
4485        tools_format: openai
4486        tools:
4487          - name: get_customer_context
4488            input_schema:
4489              type: object
4490              properties:
4491                order_id: { type: string }
4492              required: [order_id]
4493"#;
4494
4495        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4496        let diagnostics = verify_yaml_workflow(&workflow);
4497        assert!(diagnostics
4498            .iter()
4499            .any(|diagnostic| diagnostic.code == "invalid_tools_format"));
4500    }
4501
4502    #[tokio::test]
4503    async fn custom_worker_node_requires_executor() {
4504        let yaml = r#"
4505id: missing-custom-worker
4506entry_node: enrich
4507nodes:
4508  - id: enrich
4509    node_type:
4510      custom_worker:
4511        handler: GetEmployeeRecord
4512"#;
4513
4514        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4515        let client = SimpleAgentsClientBuilder::new()
4516            .with_provider(Arc::new(ToolLoopProvider))
4517            .build()
4518            .expect("client should build");
4519
4520        let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4521            &workflow,
4522            &json!({"email_text": "hello"}),
4523            &client,
4524            None,
4525            None,
4526            &YamlWorkflowRunOptions::default(),
4527        )
4528        .await
4529        .expect_err("workflow should fail without custom worker executor");
4530
4531        match error {
4532            YamlWorkflowRunError::CustomWorker { node_id, message } => {
4533                assert_eq!(node_id, "enrich");
4534                assert!(message.contains("requires a configured custom worker executor"));
4535            }
4536            other => panic!("expected custom worker error, got {other:?}"),
4537        }
4538    }
4539
4540    #[tokio::test]
4541    async fn custom_worker_receives_trace_context_block() {
4542        let yaml = r#"
4543id: custom-worker-trace-context
4544entry_node: lookup
4545nodes:
4546  - id: lookup
4547    node_type:
4548      custom_worker:
4549        handler: GetRagData
4550    config:
4551      payload:
4552        topic: demo
4553"#;
4554
4555        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4556        let worker = CapturingWorker {
4557            context: Mutex::new(None),
4558        };
4559        let options = YamlWorkflowRunOptions {
4560            trace: YamlWorkflowTraceOptions {
4561                context: Some(YamlWorkflowTraceContextInput {
4562                    trace_id: Some("trace-fixed-ctx".to_string()),
4563                    traceparent: Some("00-trace-fixed-ctx-span-fixed-01".to_string()),
4564                    ..YamlWorkflowTraceContextInput::default()
4565                }),
4566                tenant: YamlWorkflowTraceTenantContext {
4567                    conversation_id: Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5".to_string()),
4568                    request_id: Some("turn-7".to_string()),
4569                    ..YamlWorkflowTraceTenantContext::default()
4570                },
4571            },
4572            ..YamlWorkflowRunOptions::default()
4573        };
4574
4575        run_workflow_yaml_with_custom_worker_and_events_and_options(
4576            &workflow,
4577            &json!({"email_text":"hello"}),
4578            &MockExecutor,
4579            Some(&worker),
4580            None,
4581            &options,
4582        )
4583        .await
4584        .expect("workflow should execute");
4585
4586        let captured_context = worker
4587            .context
4588            .lock()
4589            .expect("capturing worker lock should not be poisoned")
4590            .clone()
4591            .expect("custom worker should receive context");
4592
4593        assert_eq!(
4594            captured_context
4595                .get("trace")
4596                .and_then(|trace| trace.get("context"))
4597                .and_then(|context| context.get("trace_id"))
4598                .and_then(Value::as_str),
4599            Some("trace-fixed-ctx")
4600        );
4601        assert_eq!(
4602            captured_context
4603                .get("trace")
4604                .and_then(|trace| trace.get("context"))
4605                .and_then(|context| context.get("traceparent"))
4606                .and_then(Value::as_str),
4607            Some("00-trace-fixed-ctx-span-fixed-01")
4608        );
4609        assert_eq!(
4610            captured_context
4611                .get("trace")
4612                .and_then(|trace| trace.get("tenant"))
4613                .and_then(|tenant| tenant.get("conversation_id"))
4614                .and_then(Value::as_str),
4615            Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5")
4616        );
4617    }
4618
4619    #[tokio::test]
4620    async fn event_sink_cancellation_stops_workflow_before_llm_execution() {
4621        let yaml = r#"
4622id: cancellation-test
4623entry_node: classify
4624nodes:
4625  - id: classify
4626    node_type:
4627      llm_call:
4628        model: gpt-4.1
4629    config:
4630      prompt: |
4631        Classify this email into exactly one category:
4632        {{ input.email_text }}
4633"#;
4634
4635        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4636        let executor = CountingExecutor {
4637            call_count: AtomicUsize::new(0),
4638        };
4639        let sink = CancelAfterFirstEventSink {
4640            cancelled: AtomicBool::new(false),
4641        };
4642
4643        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
4644            &workflow,
4645            &json!({"email_text":"hello"}),
4646            &executor,
4647            None,
4648            Some(&sink),
4649            &YamlWorkflowRunOptions::default(),
4650        )
4651        .await
4652        .expect_err("workflow should stop when sink signals cancellation");
4653
4654        assert!(matches!(
4655            err,
4656            YamlWorkflowRunError::EventSinkCancelled { .. }
4657        ));
4658        assert_eq!(executor.call_count.load(Ordering::SeqCst), 0);
4659    }
4660
4661    #[tokio::test]
4662    async fn rejects_invalid_messages_path_shape() {
4663        let yaml = r#"
4664id: email-intake-classification
4665entry_node: classify_top_level
4666nodes:
4667  - id: classify_top_level
4668    node_type:
4669      llm_call:
4670        model: gpt-4.1
4671        messages_path: input.messages
4672"#;
4673
4674        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4675        let input = json!({
4676            "email_text": "ignored",
4677            "messages": "not-a-list"
4678        });
4679
4680        let err = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
4681            .await
4682            .expect_err("workflow should fail for invalid messages shape");
4683
4684        assert!(matches!(err, YamlWorkflowRunError::Llm { .. }));
4685    }
4686
4687    #[test]
4688    fn renders_yaml_workflow_to_mermaid_with_switch_labels() {
4689        let yaml = r#"
4690id: chat-workflow
4691entry_node: decide
4692nodes:
4693  - id: decide
4694    node_type:
4695      switch:
4696        branches:
4697          - condition: '$.input.mode == "draft"'
4698            target: draft
4699        default: ask
4700  - id: draft
4701    node_type:
4702      llm_call:
4703        model: gpt-4.1
4704  - id: ask
4705    node_type:
4706      llm_call:
4707        model: gpt-4.1
4708edges:
4709  - from: draft
4710    to: ask
4711"#;
4712
4713        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4714        let mermaid = yaml_workflow_to_mermaid(&workflow);
4715
4716        assert!(mermaid.contains("flowchart TD"));
4717        assert!(mermaid.contains("decide -- \"route1\" --> draft"));
4718        assert!(mermaid.contains("decide -- \"default\" --> ask"));
4719        assert!(mermaid.contains("draft --> ask"));
4720    }
4721
4722    #[test]
4723    fn renders_yaml_workflow_tools_as_colored_tool_nodes() {
4724        let yaml = r#"
4725id: tool-graph
4726entry_node: chat
4727nodes:
4728  - id: chat
4729    node_type:
4730      llm_call:
4731        model: gemini-3-flash
4732        tools_format: simplified
4733        tools:
4734          - name: run_workflow_graph
4735            input_schema:
4736              type: object
4737              properties:
4738                workflow_id: { type: string }
4739              required: [workflow_id]
4740edges: []
4741"#;
4742
4743        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4744        let mermaid = yaml_workflow_to_mermaid(&workflow);
4745
4746        assert!(mermaid.contains("chat__tool_0"));
4747        assert!(mermaid.contains("chat -.-> chat__tool_0"));
4748        assert!(mermaid.contains("classDef toolNode"));
4749        assert!(mermaid.contains("class chat__tool_0 toolNode;"));
4750    }
4751
4752    #[test]
4753    fn renders_yaml_workflow_file_to_mermaid_with_subgraph_cluster_when_present() {
4754        let base_dir = std::env::temp_dir().join(format!(
4755            "simple_agents_mermaid_subgraph_{}",
4756            std::time::SystemTime::now()
4757                .duration_since(std::time::UNIX_EPOCH)
4758                .expect("unix epoch")
4759                .as_nanos()
4760        ));
4761        fs::create_dir_all(&base_dir).expect("temp dir should be created");
4762
4763        let orchestrator_path = base_dir.join("email-chat-orchestrator-with-subgraph-tool.yaml");
4764        let subgraph_path = base_dir.join("hr-warning-email-subgraph.yaml");
4765
4766        let orchestrator_yaml = r#"
4767id: email-chat-orchestrator-with-subgraph-tool
4768entry_node: respond_casual
4769nodes:
4770  - id: respond_casual
4771    node_type:
4772      llm_call:
4773        model: gemini-3-flash
4774        tools_format: simplified
4775        tools:
4776          - name: run_workflow_graph
4777            input_schema:
4778              type: object
4779              properties:
4780                workflow_id: { type: string }
4781              required: [workflow_id]
4782    config:
4783      prompt: |
4784        Call with:
4785        {
4786          "workflow_id": "hr-warning-email-subgraph"
4787        }
4788edges: []
4789"#;
4790
4791        let subgraph_yaml = r#"
4792id: hr-warning-email-subgraph
4793entry_node: draft_hr_warning_email
4794nodes:
4795  - id: draft_hr_warning_email
4796    node_type:
4797      llm_call:
4798        model: gemini-3-flash
4799edges: []
4800"#;
4801
4802        fs::write(&orchestrator_path, orchestrator_yaml).expect("orchestrator yaml written");
4803        fs::write(&subgraph_path, subgraph_yaml).expect("subgraph yaml written");
4804
4805        let mermaid =
4806            yaml_workflow_file_to_mermaid(&orchestrator_path).expect("mermaid should render");
4807
4808        assert!(mermaid.contains("Main: email-chat-orchestrator-with-subgraph-tool"));
4809        assert!(mermaid.contains("Subgraph: hr-warning-email-subgraph"));
4810        assert!(mermaid.contains("calls hr-warning-email-subgraph"));
4811        assert!(mermaid.contains("subgraph_1__draft_hr_warning_email"));
4812
4813        fs::remove_dir_all(base_dir).expect("temp dir removed");
4814    }
4815
4816    #[tokio::test]
4817    async fn rejects_non_yaml_workflow_file_extension() {
4818        let file_path = std::env::temp_dir().join(format!(
4819            "simple_agents_workflow_{}.txt",
4820            std::time::SystemTime::now()
4821                .duration_since(std::time::UNIX_EPOCH)
4822                .expect("unix epoch")
4823                .as_nanos()
4824        ));
4825        fs::write(&file_path, "not yaml").expect("temporary file should be created");
4826
4827        let result =
4828            run_workflow_yaml_file(&file_path, &json!({"email_text": "hello"}), &MockExecutor)
4829                .await;
4830
4831        match result {
4832            Err(YamlWorkflowRunError::FileRejected { path, reason }) => {
4833                assert!(path.ends_with(".txt"));
4834                assert!(reason.contains(".yaml or .yml"));
4835            }
4836            other => panic!("expected file rejection error, got {other:?}"),
4837        }
4838
4839        let _ = fs::remove_file(&file_path);
4840    }
4841
4842    #[test]
4843    fn converts_yaml_workflow_to_ir_definition() {
4844        let yaml = r#"
4845id: chat-workflow
4846entry_node: classify
4847nodes:
4848  - id: classify
4849    node_type:
4850      llm_call:
4851        model: gpt-4.1
4852    config:
4853      prompt: |
4854        classify
4855  - id: route
4856    node_type:
4857      switch:
4858        branches:
4859          - condition: '$.nodes.classify.output.kind == "x"'
4860            target: done
4861        default: done
4862  - id: done
4863    node_type:
4864      custom_worker:
4865        handler: GetRagData
4866    config:
4867      payload:
4868        topic: test
4869edges:
4870  - from: classify
4871    to: route
4872"#;
4873
4874        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4875        let ir = yaml_workflow_to_ir(&workflow).expect("yaml should convert to ir");
4876
4877        assert_eq!(ir.name, "chat-workflow");
4878        assert!(ir.nodes.iter().any(|n| n.id == "__yaml_start"));
4879        assert!(ir.nodes.iter().any(|n| n.id == "classify"));
4880        assert!(ir.nodes.iter().any(|n| n.id == "route"));
4881        assert!(ir.nodes.iter().any(|n| n.id == "done"));
4882    }
4883
4884    #[test]
4885    fn supports_yaml_to_ir_when_messages_path_is_used() {
4886        let yaml = r#"
4887id: chat-workflow
4888entry_node: classify
4889nodes:
4890  - id: classify
4891    node_type:
4892      llm_call:
4893        model: gpt-4.1
4894        messages_path: input.messages
4895"#;
4896
4897        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4898        let ir =
4899            yaml_workflow_to_ir(&workflow).expect("messages_path should convert to tool-based IR");
4900        assert!(ir.nodes.iter().any(|node| matches!(
4901            node.kind,
4902            crate::ir::NodeKind::Tool { ref tool, .. } if tool == "__yaml_llm_call"
4903        )));
4904    }
4905
4906    #[test]
4907    fn yaml_to_ir_preserves_llm_sampling_controls() {
4908        let yaml = r#"
4909id: llm-controls
4910entry_node: classify
4911nodes:
4912  - id: classify
4913    node_type:
4914      llm_call:
4915        model: gpt-4.1
4916        max_tokens: 120
4917        temperature: 0.3
4918        top_p: 0.9
4919    config:
4920      prompt: "classify"
4921"#;
4922
4923        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4924        let ir = yaml_workflow_to_ir(&workflow).expect("yaml should convert to ir");
4925        let llm_tool_input = ir
4926            .nodes
4927            .iter()
4928            .find_map(|node| {
4929                if let crate::ir::NodeKind::Tool { tool, input, .. } = &node.kind {
4930                    if tool == "__yaml_llm_call" {
4931                        return Some(input);
4932                    }
4933                }
4934                None
4935            })
4936            .expect("expected __yaml_llm_call tool node");
4937
4938        assert_eq!(
4939            llm_tool_input.get("max_tokens").and_then(Value::as_u64),
4940            Some(120)
4941        );
4942        let temperature = llm_tool_input
4943            .get("temperature")
4944            .and_then(Value::as_f64)
4945            .expect("temperature should be preserved");
4946        assert!((temperature - 0.3).abs() < 1e-6);
4947        let top_p = llm_tool_input
4948            .get("top_p")
4949            .and_then(Value::as_f64)
4950            .expect("top_p should be preserved");
4951        assert!((top_p - 0.9).abs() < 1e-6);
4952    }
4953
4954    #[tokio::test]
4955    async fn ir_custom_worker_path_preserves_handler_file() {
4956        let yaml = r#"
4957id: worker-handler-file
4958entry_node: worker
4959nodes:
4960  - id: worker
4961    node_type:
4962      custom_worker:
4963        handler: GetRagData
4964        handler_file: handlers.py
4965    config:
4966      payload:
4967        topic: clarification
4968"#;
4969
4970        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4971        let worker = HandlerFileCapturingWorker {
4972            handler_file: Mutex::new(None),
4973        };
4974
4975        run_workflow_yaml_with_custom_worker_and_events_and_options(
4976            &workflow,
4977            &json!({"email_text":"hello"}),
4978            &MockExecutor,
4979            Some(&worker),
4980            None,
4981            &YamlWorkflowRunOptions::default(),
4982        )
4983        .await
4984        .expect("workflow should execute");
4985
4986        let captured = worker
4987            .handler_file
4988            .lock()
4989            .expect("handler-file lock should not be poisoned")
4990            .clone();
4991        assert_eq!(captured.as_deref(), Some("handlers.py"));
4992    }
4993
4994    #[tokio::test]
4995    async fn workflow_output_contains_trace_id_in_both_locations() {
4996        let yaml = r#"
4997id: trace-test
4998entry_node: classify
4999nodes:
5000  - id: classify
5001    node_type:
5002      llm_call:
5003        model: gpt-4.1
5004    config:
5005      prompt: |
5006        Classify this email into exactly one category:
5007        {{ input.email_text }}
5008"#;
5009
5010        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5011        let output = run_workflow_yaml(&workflow, &json!({"email_text":"hello"}), &MockExecutor)
5012            .await
5013            .expect("workflow should execute");
5014
5015        let trace_id = output
5016            .trace_id
5017            .as_deref()
5018            .expect("trace_id should be present");
5019        assert!(!trace_id.is_empty());
5020        assert_eq!(
5021            output.metadata.as_ref().and_then(|value| {
5022                value
5023                    .get("telemetry")
5024                    .and_then(|telemetry| telemetry.get("trace_id"))
5025                    .and_then(Value::as_str)
5026            }),
5027            Some(trace_id)
5028        );
5029    }
5030
5031    #[tokio::test]
5032    async fn workflow_run_options_sample_rate_zero_marks_trace_unsampled() {
5033        let yaml = r#"
5034id: sample-rate-zero
5035entry_node: classify
5036nodes:
5037  - id: classify
5038    node_type:
5039      llm_call:
5040        model: gpt-4.1
5041    config:
5042      prompt: |
5043        Classify this email into exactly one category:
5044        {{ input.email_text }}
5045"#;
5046
5047        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5048        let options = YamlWorkflowRunOptions {
5049            telemetry: YamlWorkflowTelemetryConfig {
5050                sample_rate: 0.0,
5051                ..YamlWorkflowTelemetryConfig::default()
5052            },
5053            trace: YamlWorkflowTraceOptions {
5054                context: Some(YamlWorkflowTraceContextInput {
5055                    trace_id: Some("trace-sample-zero".to_string()),
5056                    ..YamlWorkflowTraceContextInput::default()
5057                }),
5058                tenant: YamlWorkflowTraceTenantContext::default(),
5059            },
5060            model: None,
5061        };
5062
5063        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5064            &workflow,
5065            &json!({"email_text":"hello"}),
5066            &MockExecutor,
5067            None,
5068            None,
5069            &options,
5070        )
5071        .await
5072        .expect("workflow should execute");
5073
5074        assert_eq!(output.trace_id.as_deref(), Some("trace-sample-zero"));
5075        assert_eq!(
5076            output
5077                .metadata
5078                .as_ref()
5079                .and_then(|value| value.get("telemetry"))
5080                .and_then(|telemetry| telemetry.get("sampled"))
5081                .and_then(Value::as_bool),
5082            Some(false)
5083        );
5084    }
5085
5086    #[test]
5087    fn trace_id_from_traceparent_parses_w3c_header() {
5088        let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
5089        assert_eq!(
5090            trace_id_from_traceparent(traceparent).as_deref(),
5091            Some("4bf92f3577b34da6a3ce929d0e0e4736")
5092        );
5093    }
5094
5095    #[test]
5096    fn resolve_telemetry_context_marks_traceparent_source() {
5097        let mut options = YamlWorkflowRunOptions::default();
5098        options.trace.context = Some(YamlWorkflowTraceContextInput {
5099            traceparent: Some(
5100                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
5101            ),
5102            ..YamlWorkflowTraceContextInput::default()
5103        });
5104
5105        let context = resolve_telemetry_context(&options, None);
5106
5107        assert_eq!(context.trace_id_source, TraceIdSource::Traceparent);
5108        assert_eq!(
5109            context.trace_id.as_deref(),
5110            Some("4bf92f3577b34da6a3ce929d0e0e4736")
5111        );
5112    }
5113
5114    #[tokio::test]
5115    async fn workflow_run_options_derives_trace_id_from_traceparent_when_trace_id_missing() {
5116        let yaml = r#"
5117id: traceparent-derived
5118entry_node: classify
5119nodes:
5120  - id: classify
5121    node_type:
5122      llm_call:
5123        model: gpt-4.1
5124    config:
5125      prompt: |
5126        Classify this email into exactly one category:
5127        {{ input.email_text }}
5128"#;
5129
5130        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5131        let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
5132        let options = YamlWorkflowRunOptions {
5133            telemetry: YamlWorkflowTelemetryConfig {
5134                sample_rate: 0.0,
5135                ..YamlWorkflowTelemetryConfig::default()
5136            },
5137            trace: YamlWorkflowTraceOptions {
5138                context: Some(YamlWorkflowTraceContextInput {
5139                    traceparent: Some(traceparent.to_string()),
5140                    ..YamlWorkflowTraceContextInput::default()
5141                }),
5142                tenant: YamlWorkflowTraceTenantContext::default(),
5143            },
5144            model: None,
5145        };
5146
5147        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5148            &workflow,
5149            &json!({"email_text":"hello"}),
5150            &MockExecutor,
5151            None,
5152            None,
5153            &options,
5154        )
5155        .await
5156        .expect("workflow should execute");
5157
5158        assert_eq!(
5159            output.trace_id.as_deref(),
5160            Some("4bf92f3577b34da6a3ce929d0e0e4736")
5161        );
5162        assert_eq!(
5163            output
5164                .metadata
5165                .as_ref()
5166                .and_then(|value| value.get("telemetry"))
5167                .and_then(|telemetry| telemetry.get("sampled"))
5168                .and_then(Value::as_bool),
5169            Some(false)
5170        );
5171    }
5172
5173    #[tokio::test]
5174    async fn workflow_run_options_sample_rate_one_marks_trace_sampled() {
5175        let yaml = r#"
5176id: sample-rate-one
5177entry_node: classify
5178nodes:
5179  - id: classify
5180    node_type:
5181      llm_call:
5182        model: gpt-4.1
5183    config:
5184      prompt: |
5185        Classify this email into exactly one category:
5186        {{ input.email_text }}
5187"#;
5188
5189        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5190        let options = YamlWorkflowRunOptions {
5191            telemetry: YamlWorkflowTelemetryConfig {
5192                sample_rate: 1.0,
5193                ..YamlWorkflowTelemetryConfig::default()
5194            },
5195            trace: YamlWorkflowTraceOptions {
5196                context: Some(YamlWorkflowTraceContextInput {
5197                    trace_id: Some("trace-sample-one".to_string()),
5198                    ..YamlWorkflowTraceContextInput::default()
5199                }),
5200                tenant: YamlWorkflowTraceTenantContext::default(),
5201            },
5202            model: None,
5203        };
5204
5205        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5206            &workflow,
5207            &json!({"email_text":"hello"}),
5208            &MockExecutor,
5209            None,
5210            None,
5211            &options,
5212        )
5213        .await
5214        .expect("workflow should execute");
5215
5216        assert_eq!(output.trace_id.as_deref(), Some("trace-sample-one"));
5217        assert_eq!(
5218            output
5219                .metadata
5220                .as_ref()
5221                .and_then(|value| value.get("telemetry"))
5222                .and_then(|telemetry| telemetry.get("sampled"))
5223                .and_then(Value::as_bool),
5224            Some(true)
5225        );
5226    }
5227
5228    #[tokio::test]
5229    async fn workflow_run_options_sampling_is_deterministic_for_fixed_trace_id() {
5230        let yaml = r#"
5231id: sample-rate-deterministic
5232entry_node: classify
5233nodes:
5234  - id: classify
5235    node_type:
5236      llm_call:
5237        model: gpt-4.1
5238    config:
5239      prompt: |
5240        Classify this email into exactly one category:
5241        {{ input.email_text }}
5242"#;
5243
5244        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5245        let options = YamlWorkflowRunOptions {
5246            telemetry: YamlWorkflowTelemetryConfig {
5247                sample_rate: 0.5,
5248                ..YamlWorkflowTelemetryConfig::default()
5249            },
5250            trace: YamlWorkflowTraceOptions {
5251                context: Some(YamlWorkflowTraceContextInput {
5252                    trace_id: Some("trace-sample-deterministic".to_string()),
5253                    ..YamlWorkflowTraceContextInput::default()
5254                }),
5255                tenant: YamlWorkflowTraceTenantContext::default(),
5256            },
5257            model: None,
5258        };
5259
5260        let mut sampled_values = Vec::new();
5261        for _ in 0..3 {
5262            let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5263                &workflow,
5264                &json!({"email_text":"hello"}),
5265                &MockExecutor,
5266                None,
5267                None,
5268                &options,
5269            )
5270            .await
5271            .expect("workflow should execute");
5272
5273            let sampled = output
5274                .metadata
5275                .as_ref()
5276                .and_then(|value| value.get("telemetry"))
5277                .and_then(|telemetry| telemetry.get("sampled"))
5278                .and_then(Value::as_bool)
5279                .expect("sampled flag should be present");
5280            sampled_values.push(sampled);
5281        }
5282
5283        assert!(sampled_values
5284            .iter()
5285            .all(|value| *value == sampled_values[0]));
5286    }
5287
5288    #[tokio::test]
5289    async fn workflow_run_options_reject_invalid_sample_rate() {
5290        let yaml = r#"
5291id: sample-rate-invalid
5292entry_node: classify
5293nodes:
5294  - id: classify
5295    node_type:
5296      llm_call:
5297        model: gpt-4.1
5298    config:
5299      prompt: |
5300        Classify this email into exactly one category:
5301        {{ input.email_text }}
5302"#;
5303
5304        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5305        let options = YamlWorkflowRunOptions {
5306            telemetry: YamlWorkflowTelemetryConfig {
5307                sample_rate: 1.1,
5308                ..YamlWorkflowTelemetryConfig::default()
5309            },
5310            ..YamlWorkflowRunOptions::default()
5311        };
5312
5313        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
5314            &workflow,
5315            &json!({"email_text":"hello"}),
5316            &MockExecutor,
5317            None,
5318            None,
5319            &options,
5320        )
5321        .await
5322        .expect_err("invalid sample_rate should fail");
5323
5324        match err {
5325            YamlWorkflowRunError::InvalidInput { message } => {
5326                assert!(message.contains("telemetry.sample_rate"));
5327            }
5328            _ => panic!("expected invalid input error for sample_rate"),
5329        }
5330    }
5331
5332    #[tokio::test]
5333    async fn workflow_run_options_reject_nan_sample_rate() {
5334        let yaml = r#"
5335id: sample-rate-invalid
5336entry_node: classify
5337nodes:
5338  - id: classify
5339    node_type:
5340      llm_call:
5341        model: gpt-4.1
5342    config:
5343      prompt: |
5344        Classify this email into exactly one category:
5345        {{ input.email_text }}
5346"#;
5347
5348        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5349        let options = YamlWorkflowRunOptions {
5350            telemetry: YamlWorkflowTelemetryConfig {
5351                sample_rate: f32::NAN,
5352                ..YamlWorkflowTelemetryConfig::default()
5353            },
5354            ..YamlWorkflowRunOptions::default()
5355        };
5356
5357        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
5358            &workflow,
5359            &json!({"email_text":"hello"}),
5360            &MockExecutor,
5361            None,
5362            None,
5363            &options,
5364        )
5365        .await
5366        .expect_err("nan sample_rate should fail");
5367
5368        assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
5369    }
5370
5371    #[test]
5372    fn subworkflow_options_inherit_parent_telemetry_configuration() {
5373        let mut parent_options = YamlWorkflowRunOptions::default();
5374        parent_options.telemetry.sample_rate = 0.0;
5375        parent_options.telemetry.payload_mode = YamlWorkflowPayloadMode::RedactedPayload;
5376        parent_options.telemetry.tool_trace_mode = YamlToolTraceMode::Redacted;
5377        parent_options.trace.tenant.conversation_id = Some("conv-123".to_string());
5378
5379        let parent_context = TraceContext {
5380            trace_id: Some("trace-parent".to_string()),
5381            span_id: Some("span-parent".to_string()),
5382            parent_span_id: None,
5383            traceparent: Some("00-trace-parent-span-parent-01".to_string()),
5384            tracestate: None,
5385            baggage: BTreeMap::new(),
5386        };
5387
5388        let subworkflow_options =
5389            build_subworkflow_options(&parent_options, Some(&parent_context), None);
5390
5391        assert_eq!(subworkflow_options.telemetry.sample_rate, 0.0);
5392        assert_eq!(
5393            subworkflow_options.telemetry.payload_mode,
5394            YamlWorkflowPayloadMode::RedactedPayload
5395        );
5396        assert_eq!(
5397            subworkflow_options.telemetry.tool_trace_mode,
5398            YamlToolTraceMode::Redacted
5399        );
5400        assert_eq!(
5401            subworkflow_options.trace.tenant.conversation_id.as_deref(),
5402            Some("conv-123")
5403        );
5404        assert_eq!(
5405            subworkflow_options
5406                .trace
5407                .context
5408                .as_ref()
5409                .and_then(|ctx| ctx.trace_id.as_deref()),
5410            Some("trace-parent")
5411        );
5412    }
5413
5414    #[test]
5415    fn trace_tenant_attributes_include_langfuse_aliases() {
5416        let tenant = YamlWorkflowTraceTenantContext {
5417            workspace_id: Some("ws-1".to_string()),
5418            user_id: Some("user-1".to_string()),
5419            conversation_id: Some("conv-1".to_string()),
5420            request_id: Some("req-1".to_string()),
5421            run_id: Some("run-1".to_string()),
5422        };
5423        let mut span = CapturingSpan::default();
5424        apply_trace_tenant_attributes_from_tenant(&mut span, &tenant);
5425
5426        assert_eq!(
5427            span.attributes
5428                .get("tenant.workspace_id")
5429                .map(String::as_str),
5430            Some("ws-1")
5431        );
5432        assert_eq!(
5433            span.attributes.get("tenant.user_id").map(String::as_str),
5434            Some("user-1")
5435        );
5436        assert_eq!(
5437            span.attributes
5438                .get("tenant.conversation_id")
5439                .map(String::as_str),
5440            Some("conv-1")
5441        );
5442        assert_eq!(
5443            span.attributes.get("langfuse.user.id").map(String::as_str),
5444            Some("user-1")
5445        );
5446        assert_eq!(
5447            span.attributes
5448                .get("langfuse.session.id")
5449                .map(String::as_str),
5450            Some("conv-1")
5451        );
5452    }
5453
5454    #[test]
5455    fn langfuse_nerdstats_attributes_are_written_when_enabled() {
5456        let output = YamlWorkflowRunOutput {
5457            workflow_id: "wf-1".to_string(),
5458            entry_node: "start".to_string(),
5459            email_text: "email".to_string(),
5460            trace: vec!["node-1".to_string()],
5461            outputs: BTreeMap::new(),
5462            terminal_node: "node-1".to_string(),
5463            terminal_output: None,
5464            step_timings: vec![YamlStepTiming {
5465                node_id: "node-1".to_string(),
5466                node_kind: "llm_call".to_string(),
5467                model_name: Some("gpt-4.1-mini".to_string()),
5468                elapsed_ms: 42,
5469                prompt_tokens: Some(4),
5470                completion_tokens: Some(6),
5471                total_tokens: Some(10),
5472                reasoning_tokens: Some(2),
5473                tokens_per_second: Some(14.0),
5474            }],
5475            llm_node_metrics: BTreeMap::new(),
5476            llm_node_models: BTreeMap::new(),
5477            total_elapsed_ms: 42,
5478            ttft_ms: Some(7),
5479            total_input_tokens: 4,
5480            total_output_tokens: 6,
5481            total_tokens: 10,
5482            total_reasoning_tokens: Some(2),
5483            tokens_per_second: 14.0,
5484            trace_id: Some("trace-1".to_string()),
5485            metadata: None,
5486        };
5487
5488        let mut span = CapturingSpan::default();
5489        apply_langfuse_nerdstats_attributes(&mut span, &output, true);
5490
5491        assert_eq!(
5492            span.attributes
5493                .get("langfuse.trace.metadata.nerdstats.workflow_id")
5494                .map(String::as_str),
5495            Some("wf-1")
5496        );
5497        assert_eq!(
5498            span.attributes
5499                .get("langfuse.trace.metadata.nerdstats.total_tokens")
5500                .map(String::as_str),
5501            Some("10")
5502        );
5503        assert_eq!(
5504            span.attributes
5505                .get("langfuse.trace.metadata.nerdstats.token_metrics_available")
5506                .map(String::as_str),
5507            Some("true")
5508        );
5509        assert!(span
5510            .attributes
5511            .contains_key("langfuse.trace.metadata.nerdstats"));
5512    }
5513
5514    #[test]
5515    fn langfuse_trace_input_output_and_usage_are_written() {
5516        let output = YamlWorkflowRunOutput {
5517            workflow_id: "wf-1".to_string(),
5518            entry_node: "start".to_string(),
5519            email_text: "email".to_string(),
5520            trace: vec!["node-1".to_string()],
5521            outputs: BTreeMap::new(),
5522            terminal_node: "node-1".to_string(),
5523            terminal_output: Some(json!({"final":"ok"})),
5524            step_timings: Vec::new(),
5525            llm_node_metrics: BTreeMap::new(),
5526            llm_node_models: BTreeMap::new(),
5527            total_elapsed_ms: 10,
5528            ttft_ms: None,
5529            total_input_tokens: 11,
5530            total_output_tokens: 22,
5531            total_tokens: 33,
5532            total_reasoning_tokens: Some(4),
5533            tokens_per_second: 1.5,
5534            trace_id: Some("trace-1".to_string()),
5535            metadata: None,
5536        };
5537
5538        let mut span = CapturingSpan::default();
5539        let input = json!({"email_text":"hello"});
5540        apply_langfuse_trace_input_output_attributes(
5541            &mut span,
5542            &input,
5543            &output,
5544            YamlWorkflowPayloadMode::FullPayload,
5545        );
5546
5547        assert_eq!(
5548            span.attributes
5549                .get("langfuse.trace.input")
5550                .map(String::as_str),
5551            Some("{\"email_text\":\"hello\"}")
5552        );
5553        assert_eq!(
5554            span.attributes
5555                .get("langfuse.trace.output")
5556                .map(String::as_str),
5557            Some("{\"final\":\"ok\"}")
5558        );
5559        assert!(span
5560            .attributes
5561            .contains_key("langfuse.trace.metadata.usage_details"));
5562    }
5563
5564    #[test]
5565    fn langfuse_observation_usage_attributes_are_written() {
5566        let usage = YamlLlmTokenUsage {
5567            prompt_tokens: 7,
5568            completion_tokens: 9,
5569            total_tokens: 16,
5570            reasoning_tokens: Some(3),
5571        };
5572        let mut span = CapturingSpan::default();
5573        apply_langfuse_observation_usage_attributes(&mut span, &usage);
5574
5575        assert_eq!(
5576            span.attributes
5577                .get("gen_ai.usage.input_tokens")
5578                .map(String::as_str),
5579            Some("7")
5580        );
5581        assert_eq!(
5582            span.attributes
5583                .get("gen_ai.usage.output_tokens")
5584                .map(String::as_str),
5585            Some("9")
5586        );
5587        assert_eq!(
5588            span.attributes
5589                .get("gen_ai.usage.total_tokens")
5590                .map(String::as_str),
5591            Some("16")
5592        );
5593        assert_eq!(
5594            span.attributes
5595                .get("gen_ai.usage.reasoning_tokens")
5596                .map(String::as_str),
5597            Some("3")
5598        );
5599        assert!(span
5600            .attributes
5601            .contains_key("langfuse.observation.usage_details"));
5602    }
5603
5604    #[tokio::test]
5605    async fn workflow_run_options_use_explicit_trace_id_and_payload_mode() {
5606        let yaml = r#"
5607id: trace-options-test
5608entry_node: classify
5609nodes:
5610  - id: classify
5611    node_type:
5612      llm_call:
5613        model: gpt-4.1
5614    config:
5615      prompt: |
5616        Classify this email into exactly one category:
5617        {{ input.email_text }}
5618"#;
5619
5620        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5621        let options = YamlWorkflowRunOptions {
5622            telemetry: YamlWorkflowTelemetryConfig {
5623                payload_mode: YamlWorkflowPayloadMode::RedactedPayload,
5624                ..YamlWorkflowTelemetryConfig::default()
5625            },
5626            trace: YamlWorkflowTraceOptions {
5627                context: Some(YamlWorkflowTraceContextInput {
5628                    trace_id: Some("trace-fixed-123".to_string()),
5629                    traceparent: Some("00-trace-fixed-123-span-1-01".to_string()),
5630                    ..YamlWorkflowTraceContextInput::default()
5631                }),
5632                tenant: YamlWorkflowTraceTenantContext {
5633                    conversation_id: Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42".to_string()),
5634                    ..YamlWorkflowTraceTenantContext::default()
5635                },
5636            },
5637            model: None,
5638        };
5639
5640        let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5641            &workflow,
5642            &json!({"email_text":"hello"}),
5643            &MockExecutor,
5644            None,
5645            None,
5646            &options,
5647        )
5648        .await
5649        .expect("workflow should execute");
5650
5651        assert_eq!(output.trace_id.as_deref(), Some("trace-fixed-123"));
5652        assert_eq!(
5653            output
5654                .metadata
5655                .as_ref()
5656                .and_then(|value| value.get("telemetry"))
5657                .and_then(|telemetry| telemetry.get("payload_mode"))
5658                .and_then(Value::as_str),
5659            Some("redacted_payload")
5660        );
5661        assert_eq!(
5662            output
5663                .metadata
5664                .as_ref()
5665                .and_then(|value| value.get("trace"))
5666                .and_then(|trace| trace.get("tenant"))
5667                .and_then(|tenant| tenant.get("conversation_id"))
5668                .and_then(Value::as_str),
5669            Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42")
5670        );
5671    }
5672
5673    #[test]
5674    fn streamed_payload_parser_extracts_last_json_object() {
5675        let raw = r#"{"state":"missing_scenario","reason":"ok"}
5676
5677extra reasoning text
5678
5679{"state":"ready","reason":"final"}"#;
5680
5681        let resolved = parse_streamed_structured_payload(raw, false)
5682            .expect("parser should extract final JSON object");
5683        assert_eq!(resolved.payload["state"], "ready");
5684        assert!(resolved.heal_confidence.is_none());
5685    }
5686
5687    #[test]
5688    fn streamed_payload_parser_handles_unbalanced_reasoning_before_json() {
5689        let raw = "reasoning text with unmatched { braces and thoughts\n{\"state\":\"ready\",\"reason\":\"final\"}";
5690
5691        let resolved = parse_streamed_structured_payload(raw, false)
5692            .expect("parser should recover final structured JSON object");
5693        assert_eq!(resolved.payload["state"], "ready");
5694    }
5695
5696    #[test]
5697    fn streamed_payload_parser_handles_markdown_with_heal() {
5698        let raw = r#"Some preface
5699```json
5700{
5701  "state": "missing_scenario",
5702  "reason": "Need more details"
5703}
5704```
5705Some trailing explanation"#;
5706
5707        let resolved = parse_streamed_structured_payload(raw, true)
5708            .expect("heal path should parse JSON block");
5709        assert_eq!(resolved.payload["state"], "missing_scenario");
5710        assert!(resolved.heal_confidence.is_some());
5711    }
5712
5713    #[test]
5714    fn streamed_payload_parser_errors_when_no_json_candidate_exists() {
5715        let raw = "No JSON in this streamed output";
5716        let error = parse_streamed_structured_payload(raw, false)
5717            .expect_err("strict stream parse should fail without JSON candidate");
5718        assert!(error.contains("no JSON object candidate found"));
5719    }
5720
5721    #[test]
5722    fn include_raw_stream_debug_events_defaults_to_false() {
5723        let _guard = stream_debug_env_lock()
5724            .lock()
5725            .expect("stream debug env lock should not be poisoned");
5726        std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
5727        assert!(!include_raw_stream_debug_events());
5728    }
5729
5730    #[test]
5731    fn include_raw_stream_debug_events_accepts_truthy_values() {
5732        let _guard = stream_debug_env_lock()
5733            .lock()
5734            .expect("stream debug env lock should not be poisoned");
5735        std::env::set_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW", "true");
5736        assert!(include_raw_stream_debug_events());
5737        std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
5738    }
5739
5740    #[test]
5741    fn structured_json_delta_filter_strips_reasoning_prefix_and_suffix() {
5742        let mut filter = StructuredJsonDeltaFilter::default();
5743        let chunks = vec![
5744            "I will think first... ",
5745            "{\"state\":\"missing_scenario\",",
5746            "\"reason\":\"Need more details\"}",
5747            " additional commentary",
5748        ];
5749
5750        let filtered = chunks
5751            .into_iter()
5752            .filter_map(|chunk| filter.split(chunk).0)
5753            .collect::<String>();
5754
5755        assert_eq!(
5756            filtered,
5757            "{\"state\":\"missing_scenario\",\"reason\":\"Need more details\"}"
5758        );
5759    }
5760
5761    #[test]
5762    fn structured_json_delta_filter_handles_braces_inside_strings() {
5763        let mut filter = StructuredJsonDeltaFilter::default();
5764        let chunks = vec![
5765            "preface ",
5766            "{\"reason\":\"brace } in text\",\"state\":\"ok\"}",
5767            " trailing",
5768        ];
5769
5770        let filtered = chunks
5771            .into_iter()
5772            .filter_map(|chunk| filter.split(chunk).0)
5773            .collect::<String>();
5774
5775        assert_eq!(
5776            filtered,
5777            "{\"reason\":\"brace } in text\",\"state\":\"ok\"}"
5778        );
5779    }
5780
5781    #[test]
5782    fn render_json_object_as_text_converts_top_level_fields() {
5783        let rendered =
5784            render_json_object_as_text(r#"{"question":"q","confidence":0.8,"nested":{"a":1}}"#);
5785        let lines: std::collections::HashSet<&str> = rendered.lines().collect();
5786
5787        assert_eq!(lines.len(), 3);
5788        assert!(lines.contains("question: q"));
5789        assert!(lines.contains("confidence: 0.8"));
5790        assert!(lines.contains("nested: {\"a\":1}"));
5791    }
5792
5793    #[test]
5794    fn stream_json_as_text_formatter_emits_once_when_complete() {
5795        let mut formatter = StreamJsonAsTextFormatter::default();
5796        formatter.push("{\"question\":\"hello\"}");
5797
5798        let first = formatter.emit_if_ready(true);
5799        let second = formatter.emit_if_ready(true);
5800
5801        assert_eq!(first, Some("question: hello".to_string()));
5802        assert_eq!(second, None);
5803    }
5804
5805    #[test]
5806    fn rewrite_yaml_condition_preserves_output_prefix_in_field_names() {
5807        let expr = "$.nodes.classify.output.output_total == 1";
5808        let rewritten = rewrite_yaml_condition_to_ir(expr).expect("condition should rewrite");
5809        assert_eq!(rewritten, "$.node_outputs.classify.output_total == 1");
5810    }
5811
5812    #[test]
5813    fn rewrite_yaml_condition_does_not_mutate_paths_in_string_literals() {
5814        let expr = "$.nodes.classify.output.state == \"$.nodes.keep.output\"";
5815        let rewritten = rewrite_yaml_condition_to_ir(expr).expect("condition should rewrite");
5816        assert_eq!(
5817            rewritten,
5818            "$.node_outputs.classify.state == \"$.nodes.keep.output\""
5819        );
5820    }
5821
5822    #[test]
5823    fn rewrite_yaml_condition_rejects_unterminated_quotes() {
5824        let err = rewrite_yaml_condition_to_ir("$.nodes.classify.output.state == \"broken")
5825            .expect_err("unterminated quote should fail");
5826        assert!(err.contains("unterminated quoted string"));
5827    }
5828
5829    #[test]
5830    fn yaml_llm_call_rejects_unknown_provider_field() {
5831        let yaml = r#"
5832id: wf
5833entry_node: classify
5834nodes:
5835  - id: classify
5836    node_type:
5837      llm_call:
5838        provider: openai
5839        model: gpt-4.1-mini
5840    config:
5841      prompt: hi
5842"#;
5843
5844        let err = serde_yaml::from_str::<YamlWorkflow>(yaml)
5845            .expect_err("unknown llm_call fields should fail parsing");
5846        let message = err.to_string();
5847        assert!(message.contains("provider"));
5848        assert!(message.contains("unknown field"));
5849    }
5850
5851    #[test]
5852    fn yaml_custom_worker_rejects_unknown_language_field() {
5853        let yaml = r#"
5854id: wf
5855entry_node: worker
5856nodes:
5857  - id: worker
5858    node_type:
5859      custom_worker:
5860        language: python
5861        handler: get_rag_data
5862"#;
5863
5864        let err = serde_yaml::from_str::<YamlWorkflow>(yaml)
5865            .expect_err("unknown custom_worker fields should fail parsing");
5866        let message = err.to_string();
5867        assert!(message.contains("language"));
5868        assert!(message.contains("unknown field"));
5869    }
5870
5871    #[tokio::test]
5872    async fn custom_worker_handler_file_requires_executor() {
5873        let yaml = r#"
5874id: wf
5875entry_node: lookup
5876nodes:
5877  - id: lookup
5878    node_type:
5879      custom_worker:
5880        handler: get_rag_data
5881        handler_file: handlers.py
5882"#;
5883
5884        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5885        let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
5886            &workflow,
5887            &json!({"email_text": "hello"}),
5888            &MockExecutor,
5889            None,
5890            None,
5891            &YamlWorkflowRunOptions::default(),
5892        )
5893        .await
5894        .expect_err("handler_file without executor should fail");
5895
5896        let rendered = err.to_string();
5897        assert!(rendered.contains("handler_file"));
5898        assert!(rendered.contains("no custom worker executor configured"));
5899    }
5900
5901    #[tokio::test]
5902    async fn validates_workflow_input_before_ir_runtime_path() {
5903        let yaml = r#"
5904id: chat-workflow
5905entry_node: classify
5906nodes:
5907  - id: classify
5908    node_type:
5909      llm_call:
5910        model: gpt-4.1
5911    config:
5912      prompt: |
5913        classify
5914"#;
5915
5916        let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5917        let err = run_workflow_yaml(&workflow, &json!("not-an-object"), &MockExecutor)
5918            .await
5919            .expect_err("non-object input should fail before execution");
5920
5921        assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
5922    }
5923
5924    #[test]
5925    fn interpolate_template_supports_dollar_prefixed_paths() {
5926        let context = json!({
5927            "input": {
5928                "email_text": "hello"
5929            }
5930        });
5931
5932        let rendered = interpolate_template("value={{ $.input.email_text }}", &context);
5933        assert_eq!(rendered, "value=hello");
5934    }
5935
5936    #[test]
5937    fn to_typed_output_maps_node_kinds_from_workflow_definition() {
5938        let workflow_yaml = r#"
5939id: typed-output-demo
5940entry_node: classify
5941nodes:
5942  - id: classify
5943    node_type:
5944      llm_call:
5945        model: gpt-4.1
5946  - id: route
5947    node_type:
5948      switch:
5949        branches:
5950          - condition: '$.nodes.classify.output.state == "ready"'
5951            target: worker
5952        default: worker
5953  - id: worker
5954    node_type:
5955      custom_worker:
5956        handler: do_work
5957edges:
5958  - from: classify
5959    to: route
5960  - from: route
5961    to: worker
5962"#;
5963        let workflow: YamlWorkflow =
5964            serde_yaml::from_str(workflow_yaml).expect("workflow should parse");
5965        let output = YamlWorkflowRunOutput {
5966            workflow_id: "typed-output-demo".to_string(),
5967            entry_node: "classify".to_string(),
5968            email_text: String::new(),
5969            trace: vec![
5970                "classify".to_string(),
5971                "route".to_string(),
5972                "worker".to_string(),
5973            ],
5974            outputs: BTreeMap::from([
5975                ("classify".to_string(), json!({"state": "ready"})),
5976                ("route".to_string(), json!("worker")),
5977                ("worker".to_string(), json!({"ok": true})),
5978            ]),
5979            terminal_node: "worker".to_string(),
5980            terminal_output: Some(json!({"ok": true})),
5981            step_timings: Vec::new(),
5982            llm_node_metrics: BTreeMap::new(),
5983            llm_node_models: BTreeMap::new(),
5984            total_elapsed_ms: 0,
5985            ttft_ms: None,
5986            total_input_tokens: 0,
5987            total_output_tokens: 0,
5988            total_tokens: 0,
5989            total_reasoning_tokens: None,
5990            tokens_per_second: 0.0,
5991            trace_id: None,
5992            metadata: None,
5993        };
5994
5995        let typed = output.to_typed_output(&workflow);
5996        assert_eq!(typed.node_outputs.len(), 3);
5997        assert_eq!(
5998            typed
5999                .node_outputs
6000                .iter()
6001                .find(|record| record.node_id == "classify")
6002                .map(|record| record.node_kind),
6003            Some(YamlWorkflowNodeKind::LlmCall)
6004        );
6005        assert_eq!(
6006            typed
6007                .node_outputs
6008                .iter()
6009                .find(|record| record.node_id == "route")
6010                .map(|record| record.node_kind),
6011            Some(YamlWorkflowNodeKind::Switch)
6012        );
6013        assert_eq!(
6014            typed
6015                .node_outputs
6016                .iter()
6017                .find(|record| record.node_id == "worker")
6018                .map(|record| record.node_kind),
6019            Some(YamlWorkflowNodeKind::CustomWorker)
6020        );
6021        assert_eq!(
6022            typed
6023                .terminal_output
6024                .as_ref()
6025                .map(|record| record.node_kind),
6026            Some(YamlWorkflowNodeKind::CustomWorker)
6027        );
6028    }
6029
6030    #[test]
6031    fn to_typed_output_marks_unknown_node_ids() {
6032        let workflow_yaml = r#"
6033id: typed-output-unknown
6034entry_node: start
6035nodes:
6036  - id: start
6037    node_type:
6038      llm_call:
6039        model: gpt-4.1
6040"#;
6041        let workflow: YamlWorkflow =
6042            serde_yaml::from_str(workflow_yaml).expect("workflow should parse");
6043        let output = YamlWorkflowRunOutput {
6044            workflow_id: "typed-output-unknown".to_string(),
6045            entry_node: "start".to_string(),
6046            email_text: String::new(),
6047            trace: vec!["start".to_string(), "not-in-graph".to_string()],
6048            outputs: BTreeMap::from([("not-in-graph".to_string(), json!({"v": 1}))]),
6049            terminal_node: "not-in-graph".to_string(),
6050            terminal_output: Some(json!({"v": 1})),
6051            step_timings: Vec::new(),
6052            llm_node_metrics: BTreeMap::new(),
6053            llm_node_models: BTreeMap::new(),
6054            total_elapsed_ms: 0,
6055            ttft_ms: None,
6056            total_input_tokens: 0,
6057            total_output_tokens: 0,
6058            total_tokens: 0,
6059            total_reasoning_tokens: None,
6060            tokens_per_second: 0.0,
6061            trace_id: None,
6062            metadata: None,
6063        };
6064
6065        let typed = output.to_typed_output(&workflow);
6066        assert_eq!(typed.node_outputs.len(), 1);
6067        assert_eq!(
6068            typed.node_outputs[0].node_kind,
6069            YamlWorkflowNodeKind::Unknown
6070        );
6071        assert_eq!(
6072            typed
6073                .terminal_output
6074                .as_ref()
6075                .map(|record| record.node_kind),
6076            Some(YamlWorkflowNodeKind::Unknown)
6077        );
6078    }
6079
6080    #[test]
6081    fn to_typed_event_maps_known_event_type() {
6082        let event = YamlWorkflowEvent {
6083            event_type: "node_completed".to_string(),
6084            node_id: Some("classify".to_string()),
6085            step_id: Some("classify".to_string()),
6086            node_kind: Some("llm_call".to_string()),
6087            streamable: Some(false),
6088            message: Some("done".to_string()),
6089            delta: None,
6090            token_kind: None,
6091            is_terminal_node_token: Some(true),
6092            elapsed_ms: Some(11),
6093            metadata: Some(json!({"k": "v"})),
6094        };
6095
6096        let typed = event.to_typed_event();
6097        assert_eq!(typed.event_type, YamlWorkflowEventType::NodeCompleted);
6098        assert_eq!(typed.raw_event_type, "node_completed");
6099        assert_eq!(typed.node_id.as_deref(), Some("classify"));
6100        assert_eq!(
6101            typed.metadata.as_ref().and_then(|value| value.get("k")),
6102            Some(&json!("v"))
6103        );
6104    }
6105
6106    #[test]
6107    fn to_typed_event_marks_unknown_event_type() {
6108        let event = YamlWorkflowEvent {
6109            event_type: "custom_event_not_recognized".to_string(),
6110            node_id: None,
6111            step_id: None,
6112            node_kind: None,
6113            streamable: None,
6114            message: None,
6115            delta: None,
6116            token_kind: None,
6117            is_terminal_node_token: None,
6118            elapsed_ms: None,
6119            metadata: None,
6120        };
6121
6122        let typed = event.to_typed_event();
6123        assert_eq!(typed.event_type, YamlWorkflowEventType::Unknown);
6124        assert_eq!(typed.raw_event_type, "custom_event_not_recognized");
6125    }
6126}