1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::time::Instant;
4
5use async_trait::async_trait;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use simple_agent_type::message::{Message, Role};
10use simple_agent_type::request::CompletionRequest;
11use simple_agent_type::response::FinishReason;
12use simple_agent_type::tool::{
13 ToolCall, ToolChoice, ToolChoiceMode, ToolChoiceTool, ToolDefinition, ToolType,
14};
15use simple_agents_core::{
16 CompletionMode, CompletionOptions, CompletionOutcome, SimpleAgentsClient,
17};
18use simple_agents_healing::JsonishParser;
19use thiserror::Error;
20
21use crate::ir::{Node, NodeKind, RouterRoute, WorkflowDefinition, WORKFLOW_IR_V0};
22use crate::observability::tracing::{
23 flush_workflow_tracer, workflow_tracer, SpanKind, TraceContext, WorkflowSpan,
24};
25use crate::runtime::{
26 LlmExecutionError, LlmExecutionInput, LlmExecutionOutput, LlmExecutor, ToolExecutionError,
27 ToolExecutionInput, ToolExecutor, WorkflowRuntime, WorkflowRuntimeError,
28 WorkflowRuntimeOptions,
29};
30use crate::validation::validate_and_normalize;
31use crate::visualize::workflow_to_mermaid;
32
33mod runner;
34pub use runner::WorkflowRunner;
35mod api;
36mod client_executor;
37mod context;
38mod execute;
39mod globals;
40mod llm_tools;
41mod validation;
42pub use api::{
43 run_email_workflow_yaml, run_email_workflow_yaml_file,
44 run_email_workflow_yaml_file_with_client,
45 run_email_workflow_yaml_file_with_client_and_custom_worker,
46 run_email_workflow_yaml_file_with_client_and_custom_worker_and_events,
47 run_email_workflow_yaml_with_client, run_email_workflow_yaml_with_client_and_custom_worker,
48 run_email_workflow_yaml_with_client_and_custom_worker_and_events,
49 run_email_workflow_yaml_with_custom_worker,
50 run_email_workflow_yaml_with_custom_worker_and_events, run_workflow_yaml,
51 run_workflow_yaml_file, run_workflow_yaml_file_with_client,
52 run_workflow_yaml_file_with_client_and_custom_worker,
53 run_workflow_yaml_file_with_client_and_custom_worker_and_events,
54 run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options,
55 run_workflow_yaml_with_client, run_workflow_yaml_with_client_and_custom_worker,
56 run_workflow_yaml_with_client_and_custom_worker_and_events,
57 run_workflow_yaml_with_custom_worker, run_workflow_yaml_with_custom_worker_and_events,
58};
59use client_executor::BorrowedClientExecutor;
60use context::{
61 build_yaml_context_from_ir_scope, collect_template_bindings, evaluate_switch_condition,
62 interpolate_template, json_type_name, parse_messages_from_context, resolve_path,
63};
64use globals::{apply_set_globals, apply_update_globals};
65use llm_tools::{
66 default_llm_output_schema, llm_output_schema_for_node, normalize_llm_tools,
67 normalize_tool_choice,
68};
69pub use validation::verify_yaml_workflow;
70
71const YAML_START_NODE_ID: &str = "__yaml_start";
72const YAML_LLM_TOOL_ID: &str = "__yaml_llm_call";
73const MAX_WORKFLOW_YAML_BYTES: u64 = 1024 * 1024;
74const MAX_WORKFLOW_YAML_DEPTH: usize = 64;
75
76static TRACE_ID_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
77
78#[derive(Debug, Clone, PartialEq, Serialize)]
79pub struct YamlStepTiming {
80 pub node_id: String,
81 pub node_kind: String,
82 #[serde(skip_serializing_if = "Option::is_none")]
83 pub model_name: Option<String>,
84 pub elapsed_ms: u128,
85 #[serde(skip_serializing_if = "Option::is_none")]
86 pub prompt_tokens: Option<u32>,
87 #[serde(skip_serializing_if = "Option::is_none")]
88 pub completion_tokens: Option<u32>,
89 #[serde(skip_serializing_if = "Option::is_none")]
90 pub total_tokens: Option<u32>,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub reasoning_tokens: Option<u32>,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 pub tokens_per_second: Option<f64>,
95}
96
97#[derive(Debug, Clone, PartialEq, Serialize)]
98pub struct YamlLlmNodeMetrics {
99 pub elapsed_ms: u128,
100 pub prompt_tokens: u32,
101 pub completion_tokens: u32,
102 pub total_tokens: u32,
103 #[serde(skip_serializing_if = "Option::is_none")]
104 pub reasoning_tokens: Option<u32>,
105 pub tokens_per_second: f64,
106}
107
108#[derive(Debug, Clone, PartialEq, Serialize)]
109pub struct YamlWorkflowRunOutput {
110 pub workflow_id: String,
111 pub entry_node: String,
112 pub email_text: String,
113 pub trace: Vec<String>,
114 pub outputs: BTreeMap<String, Value>,
115 pub terminal_node: String,
116 pub terminal_output: Option<Value>,
117 pub step_timings: Vec<YamlStepTiming>,
118 pub llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics>,
119 pub llm_node_models: BTreeMap<String, String>,
120 pub total_elapsed_ms: u128,
121 #[serde(skip_serializing_if = "Option::is_none")]
122 pub ttft_ms: Option<u128>,
123 pub total_input_tokens: u64,
124 pub total_output_tokens: u64,
125 pub total_tokens: u64,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub total_reasoning_tokens: Option<u64>,
128 pub tokens_per_second: f64,
129 #[serde(skip_serializing_if = "Option::is_none")]
130 pub trace_id: Option<String>,
131 #[serde(skip_serializing_if = "Option::is_none")]
132 pub metadata: Option<Value>,
133}
134
135#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
136#[serde(rename_all = "snake_case")]
137pub enum YamlWorkflowPayloadMode {
138 #[default]
139 FullPayload,
140 RedactedPayload,
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
144#[serde(rename_all = "snake_case")]
145pub enum YamlToolTraceMode {
146 #[default]
147 Full,
148 Redacted,
149 Off,
150}
151
152#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
153pub struct YamlWorkflowTraceContextInput {
154 #[serde(default)]
155 pub trace_id: Option<String>,
156 #[serde(default)]
157 pub span_id: Option<String>,
158 #[serde(default)]
159 pub parent_span_id: Option<String>,
160 #[serde(default)]
161 pub traceparent: Option<String>,
162 #[serde(default)]
163 pub tracestate: Option<String>,
164 #[serde(default)]
165 pub baggage: BTreeMap<String, String>,
166}
167
168#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
169pub struct YamlWorkflowTraceTenantContext {
170 #[serde(default)]
171 pub workspace_id: Option<String>,
172 #[serde(default)]
173 pub user_id: Option<String>,
174 #[serde(default)]
175 pub conversation_id: Option<String>,
176 #[serde(default)]
177 pub request_id: Option<String>,
178 #[serde(default)]
179 pub run_id: Option<String>,
180}
181
182#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
183pub struct YamlWorkflowTelemetryConfig {
184 #[serde(default = "default_true")]
185 pub enabled: bool,
186 #[serde(default = "default_true")]
187 pub nerdstats: bool,
188 #[serde(default = "default_sample_rate")]
189 pub sample_rate: f32,
190 #[serde(default)]
191 pub payload_mode: YamlWorkflowPayloadMode,
192 #[serde(default = "default_retention_days")]
193 pub retention_days: u32,
194 #[serde(default = "default_true")]
195 pub multi_tenant: bool,
196 #[serde(default)]
197 pub tool_trace_mode: YamlToolTraceMode,
198}
199
200impl Default for YamlWorkflowTelemetryConfig {
201 fn default() -> Self {
202 Self {
203 enabled: true,
204 nerdstats: true,
205 sample_rate: 1.0,
206 payload_mode: YamlWorkflowPayloadMode::FullPayload,
207 retention_days: 30,
208 multi_tenant: true,
209 tool_trace_mode: YamlToolTraceMode::Full,
210 }
211 }
212}
213
214#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
215pub struct YamlWorkflowTraceOptions {
216 #[serde(default)]
217 pub context: Option<YamlWorkflowTraceContextInput>,
218 #[serde(default)]
219 pub tenant: YamlWorkflowTraceTenantContext,
220}
221
222#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
223pub struct YamlWorkflowRunOptions {
224 #[serde(default)]
225 pub telemetry: YamlWorkflowTelemetryConfig,
226 #[serde(default)]
227 pub trace: YamlWorkflowTraceOptions,
228 #[serde(default)]
229 pub model: Option<String>,
230}
231
232#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
233pub struct YamlLlmTokenUsage {
234 pub prompt_tokens: u32,
235 pub completion_tokens: u32,
236 pub total_tokens: u32,
237 pub reasoning_tokens: Option<u32>,
238}
239
240#[derive(Debug, Clone, PartialEq, Serialize)]
241pub struct YamlLlmExecutionResult {
242 pub payload: Value,
243 pub usage: Option<YamlLlmTokenUsage>,
244 pub ttft_ms: Option<u128>,
245 pub tool_calls: Vec<YamlToolCallTrace>,
246}
247
248#[derive(Debug, Clone, PartialEq, Serialize)]
249pub struct YamlToolCallTrace {
250 pub id: String,
251 pub name: String,
252 pub arguments: Value,
253 pub output: Option<Value>,
254 pub status: String,
255 pub elapsed_ms: u128,
256 pub error: Option<String>,
257}
258
259#[derive(Debug, Clone, Default)]
260struct YamlTokenTotals {
261 input_tokens: u64,
262 output_tokens: u64,
263 total_tokens: u64,
264 reasoning_tokens: Option<u64>,
265}
266
267impl YamlTokenTotals {
268 fn add_usage(&mut self, usage: &YamlLlmTokenUsage) {
269 self.input_tokens += u64::from(usage.prompt_tokens);
270 self.output_tokens += u64::from(usage.completion_tokens);
271 self.total_tokens += u64::from(usage.total_tokens);
272
273 if let Some(reasoning_tokens) = usage.reasoning_tokens {
274 let next = self.reasoning_tokens.unwrap_or(0) + u64::from(reasoning_tokens);
275 self.reasoning_tokens = Some(next);
276 }
277 }
278
279 fn tokens_per_second(&self, elapsed_ms: u128) -> f64 {
280 if elapsed_ms == 0 {
281 return 0.0;
282 }
283 round_two_decimals((self.output_tokens as f64) * 1000.0 / (elapsed_ms as f64))
284 }
285}
286
287fn round_two_decimals(value: f64) -> f64 {
288 (value * 100.0).round() / 100.0
289}
290
291fn completion_tokens_per_second(completion_tokens: u32, elapsed_ms: u128) -> f64 {
292 if elapsed_ms == 0 {
293 return 0.0;
294 }
295 round_two_decimals((completion_tokens as f64) * 1000.0 / (elapsed_ms as f64))
296}
297
298fn resolve_requested_model(run_model_override: Option<&str>, node_model: &str) -> String {
299 run_model_override
300 .and_then(|model| {
301 let trimmed = model.trim();
302 if trimmed.is_empty() {
303 None
304 } else {
305 Some(trimmed.to_string())
306 }
307 })
308 .unwrap_or_else(|| node_model.to_string())
309}
310
311fn default_true() -> bool {
312 true
313}
314
315fn default_sample_rate() -> f32 {
316 1.0
317}
318
319fn default_retention_days() -> u32 {
320 30
321}
322
323fn validate_sample_rate(sample_rate: f32) -> Result<(), YamlWorkflowRunError> {
324 if sample_rate.is_finite() && (0.0..=1.0).contains(&sample_rate) {
325 return Ok(());
326 }
327
328 Err(YamlWorkflowRunError::InvalidInput {
329 message: format!(
330 "telemetry.sample_rate must be between 0.0 and 1.0 inclusive; received {sample_rate}"
331 ),
332 })
333}
334
335fn should_sample_trace(trace_id: &str, sample_rate: f32) -> bool {
336 if sample_rate >= 1.0 {
337 return true;
338 }
339 if sample_rate <= 0.0 {
340 return false;
341 }
342
343 let mut hash: u64 = 0xcbf29ce484222325;
344 for byte in trace_id.as_bytes() {
345 hash ^= u64::from(*byte);
346 hash = hash.wrapping_mul(0x100000001b3);
347 }
348
349 let ratio = (hash as f64) / (u64::MAX as f64);
350 ratio < (sample_rate as f64)
351}
352
353fn trace_id_from_traceparent(traceparent: &str) -> Option<String> {
354 let mut parts = traceparent.split('-');
355 let version = parts.next()?;
356 let trace_id = parts.next()?;
357 let _span_id = parts.next()?;
358 let _flags = parts.next()?;
359 if parts.next().is_some() {
360 return None;
361 }
362
363 if version.len() != 2 || trace_id.len() != 32 {
364 return None;
365 }
366
367 if !version.chars().all(|ch| ch.is_ascii_hexdigit()) {
368 return None;
369 }
370 if !trace_id.chars().all(|ch| ch.is_ascii_hexdigit()) {
371 return None;
372 }
373 if trace_id.chars().all(|ch| ch == '0') {
374 return None;
375 }
376
377 Some(trace_id.to_ascii_lowercase())
378}
379
380#[derive(Debug, Clone, Copy, PartialEq, Eq)]
381enum TraceIdSource {
382 Disabled,
383 ExplicitTraceId,
384 ParentTraceId,
385 Traceparent,
386 ParentTraceparent,
387 Generated,
388}
389
390#[derive(Debug, Clone)]
391struct ResolvedTelemetryContext {
392 trace_id: Option<String>,
393 sampled: bool,
394 trace_id_source: TraceIdSource,
395}
396
397fn resolve_run_trace_id_with_source(
398 options: &YamlWorkflowRunOptions,
399 parent_trace_context: Option<&TraceContext>,
400) -> (Option<String>, TraceIdSource) {
401 if !options.telemetry.enabled {
402 return (None, TraceIdSource::Disabled);
403 }
404
405 if let Some(trace_id) = options
406 .trace
407 .context
408 .as_ref()
409 .and_then(|context| context.trace_id.clone())
410 {
411 return (Some(trace_id), TraceIdSource::ExplicitTraceId);
412 }
413
414 if let Some(trace_id) = parent_trace_context.and_then(|context| context.trace_id.clone()) {
415 return (Some(trace_id), TraceIdSource::ParentTraceId);
416 }
417
418 if let Some(trace_id) = options
419 .trace
420 .context
421 .as_ref()
422 .and_then(|context| context.traceparent.as_deref())
423 .and_then(trace_id_from_traceparent)
424 {
425 return (Some(trace_id), TraceIdSource::Traceparent);
426 }
427
428 if let Some(trace_id) = parent_trace_context
429 .and_then(|context| context.traceparent.as_deref())
430 .and_then(trace_id_from_traceparent)
431 {
432 return (Some(trace_id), TraceIdSource::ParentTraceparent);
433 }
434
435 (Some(generate_trace_id()), TraceIdSource::Generated)
436}
437
438fn resolve_telemetry_context(
439 options: &YamlWorkflowRunOptions,
440 parent_trace_context: Option<&TraceContext>,
441) -> ResolvedTelemetryContext {
442 let (trace_id, trace_id_source) =
443 resolve_run_trace_id_with_source(options, parent_trace_context);
444 let sampled = trace_id
445 .as_deref()
446 .map(|value| should_sample_trace(value, options.telemetry.sample_rate))
447 .unwrap_or(false);
448
449 ResolvedTelemetryContext {
450 trace_id,
451 sampled,
452 trace_id_source,
453 }
454}
455
456fn generate_trace_id() -> String {
457 use std::time::{SystemTime, UNIX_EPOCH};
458
459 let now_nanos = SystemTime::now()
460 .duration_since(UNIX_EPOCH)
461 .map(|duration| duration.as_nanos())
462 .unwrap_or(0);
463 let sequence = u128::from(TRACE_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed));
464 format!("{:032x}", now_nanos ^ sequence)
465}
466
467fn workflow_metadata_with_trace(
468 options: &YamlWorkflowRunOptions,
469 trace_id: &str,
470 sampled: bool,
471 trace_id_source: TraceIdSource,
472) -> Value {
473 json!({
474 "telemetry": {
475 "trace_id": trace_id,
476 "trace_id_source": match trace_id_source {
477 TraceIdSource::Disabled => "disabled",
478 TraceIdSource::ExplicitTraceId => "explicit_trace_id",
479 TraceIdSource::ParentTraceId => "parent_trace_id",
480 TraceIdSource::Traceparent => "traceparent",
481 TraceIdSource::ParentTraceparent => "parent_traceparent",
482 TraceIdSource::Generated => "generated",
483 },
484 "enabled": options.telemetry.enabled,
485 "sampled": sampled,
486 "nerdstats": options.telemetry.nerdstats,
487 "sample_rate": options.telemetry.sample_rate,
488 "payload_mode": match options.telemetry.payload_mode {
489 YamlWorkflowPayloadMode::FullPayload => "full_payload",
490 YamlWorkflowPayloadMode::RedactedPayload => "redacted_payload",
491 },
492 "retention_days": options.telemetry.retention_days,
493 "multi_tenant": options.telemetry.multi_tenant,
494 "tool_trace_mode": match options.telemetry.tool_trace_mode {
495 YamlToolTraceMode::Full => "full",
496 YamlToolTraceMode::Redacted => "redacted",
497 YamlToolTraceMode::Off => "off",
498 },
499 },
500 "trace": {
501 "tenant": {
502 "workspace_id": options.trace.tenant.workspace_id,
503 "user_id": options.trace.tenant.user_id,
504 "conversation_id": options.trace.tenant.conversation_id,
505 "request_id": options.trace.tenant.request_id,
506 "run_id": options.trace.tenant.run_id,
507 }
508 },
509 })
510}
511
512fn apply_trace_identity_attributes(span: &mut dyn WorkflowSpan, trace_id: Option<&str>) {
513 if let Some(value) = trace_id {
514 span.set_attribute("trace_id", value);
515 }
516}
517
518fn apply_trace_tenant_attributes_from_tenant(
519 span: &mut dyn WorkflowSpan,
520 tenant: &YamlWorkflowTraceTenantContext,
521) {
522 if let Some(workspace_id) = tenant.workspace_id.as_deref() {
523 span.set_attribute("tenant.workspace_id", workspace_id);
524 }
525 if let Some(user_id) = tenant.user_id.as_deref() {
526 span.set_attribute("tenant.user_id", user_id);
527 span.set_attribute("user.id", user_id);
528 span.set_attribute("langfuse.user.id", user_id);
529 }
530 if let Some(conversation_id) = tenant.conversation_id.as_deref() {
531 span.set_attribute("tenant.conversation_id", conversation_id);
532 span.set_attribute("session.id", conversation_id);
533 span.set_attribute("langfuse.session.id", conversation_id);
534 }
535 if let Some(request_id) = tenant.request_id.as_deref() {
536 span.set_attribute("tenant.request_id", request_id);
537 }
538 if let Some(run_id) = tenant.run_id.as_deref() {
539 span.set_attribute("tenant.run_id", run_id);
540 }
541}
542
543fn apply_trace_tenant_attributes(span: &mut dyn WorkflowSpan, options: &YamlWorkflowRunOptions) {
544 apply_trace_tenant_attributes_from_tenant(span, &options.trace.tenant);
545}
546
547fn workflow_nerdstats(output: &YamlWorkflowRunOutput) -> Value {
548 let llm_nodes_without_usage: Vec<String> = output
549 .step_timings
550 .iter()
551 .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
552 .map(|step| step.node_id.clone())
553 .collect();
554 let token_metrics_available = llm_nodes_without_usage.is_empty();
555 let token_metrics_source = if token_metrics_available {
556 "provider_usage"
557 } else {
558 "provider_stream_usage_unavailable"
559 };
560 let total_input_tokens = if token_metrics_available {
561 json!(output.total_input_tokens)
562 } else {
563 Value::Null
564 };
565 let total_output_tokens = if token_metrics_available {
566 json!(output.total_output_tokens)
567 } else {
568 Value::Null
569 };
570 let total_tokens = if token_metrics_available {
571 json!(output.total_tokens)
572 } else {
573 Value::Null
574 };
575 let total_reasoning_tokens = if token_metrics_available {
576 json!(output.total_reasoning_tokens)
577 } else {
578 Value::Null
579 };
580 let tokens_per_second = if token_metrics_available {
581 json!(output.tokens_per_second)
582 } else {
583 Value::Null
584 };
585
586 json!({
587 "workflow_id": output.workflow_id,
588 "terminal_node": output.terminal_node,
589 "total_elapsed_ms": output.total_elapsed_ms,
590 "ttft_ms": output.ttft_ms,
591 "step_details": output.step_timings,
592 "total_input_tokens": total_input_tokens,
593 "total_output_tokens": total_output_tokens,
594 "total_tokens": total_tokens,
595 "total_reasoning_tokens": total_reasoning_tokens,
596 "tokens_per_second": tokens_per_second,
597 "trace_id": output.trace_id,
598 "token_metrics_available": token_metrics_available,
599 "token_metrics_source": token_metrics_source,
600 "llm_nodes_without_usage": llm_nodes_without_usage,
601 })
602}
603
604fn apply_langfuse_nerdstats_attributes(
605 span: &mut dyn WorkflowSpan,
606 output: &YamlWorkflowRunOutput,
607 enabled: bool,
608) {
609 if !enabled {
610 return;
611 }
612
613 let nerdstats = workflow_nerdstats(output);
614 let nerdstats_json = nerdstats.to_string();
615 span.set_attribute("langfuse.trace.metadata.nerdstats", nerdstats_json.as_str());
616
617 span.set_attribute(
618 "langfuse.trace.metadata.nerdstats.workflow_id",
619 output.workflow_id.as_str(),
620 );
621 span.set_attribute(
622 "langfuse.trace.metadata.nerdstats.terminal_node",
623 output.terminal_node.as_str(),
624 );
625 span.set_attribute(
626 "langfuse.trace.metadata.nerdstats.total_elapsed_ms",
627 output.total_elapsed_ms.to_string().as_str(),
628 );
629 span.set_attribute(
630 "langfuse.trace.metadata.nerdstats.step_details_count",
631 output.step_timings.len().to_string().as_str(),
632 );
633 span.set_attribute(
634 "langfuse.trace.metadata.nerdstats.total_input_tokens",
635 output.total_input_tokens.to_string().as_str(),
636 );
637 span.set_attribute(
638 "langfuse.trace.metadata.nerdstats.total_output_tokens",
639 output.total_output_tokens.to_string().as_str(),
640 );
641 span.set_attribute(
642 "langfuse.trace.metadata.nerdstats.total_tokens",
643 output.total_tokens.to_string().as_str(),
644 );
645 span.set_attribute(
646 "langfuse.trace.metadata.nerdstats.tokens_per_second",
647 output.tokens_per_second.to_string().as_str(),
648 );
649
650 if let Some(ttft_ms) = output.ttft_ms {
651 span.set_attribute(
652 "langfuse.trace.metadata.nerdstats.ttft_ms",
653 ttft_ms.to_string().as_str(),
654 );
655 }
656
657 if let Some(reasoning_tokens) = output.total_reasoning_tokens {
658 span.set_attribute(
659 "langfuse.trace.metadata.nerdstats.total_reasoning_tokens",
660 reasoning_tokens.to_string().as_str(),
661 );
662 }
663
664 let llm_nodes_without_usage_count = output
665 .step_timings
666 .iter()
667 .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
668 .count();
669 let token_metrics_available = llm_nodes_without_usage_count == 0;
670 span.set_attribute(
671 "langfuse.trace.metadata.nerdstats.token_metrics_available",
672 if token_metrics_available {
673 "true"
674 } else {
675 "false"
676 },
677 );
678 span.set_attribute(
679 "langfuse.trace.metadata.nerdstats.token_metrics_source",
680 if token_metrics_available {
681 "provider_usage"
682 } else {
683 "provider_stream_usage_unavailable"
684 },
685 );
686 span.set_attribute(
687 "langfuse.trace.metadata.nerdstats.llm_nodes_without_usage_count",
688 llm_nodes_without_usage_count.to_string().as_str(),
689 );
690}
691
692fn apply_langfuse_trace_input_output_attributes(
693 span: &mut dyn WorkflowSpan,
694 workflow_input: &Value,
695 output: &YamlWorkflowRunOutput,
696 payload_mode: YamlWorkflowPayloadMode,
697) {
698 let trace_input = payload_for_span(payload_mode, workflow_input);
699 span.set_attribute("langfuse.trace.input", trace_input.as_str());
700
701 if let Some(terminal_output) = output.terminal_output.as_ref() {
702 let trace_output = payload_for_span(payload_mode, terminal_output);
703 span.set_attribute("langfuse.trace.output", trace_output.as_str());
704 }
705
706 let usage_details = json!({
707 "input": output.total_input_tokens,
708 "output": output.total_output_tokens,
709 "total": output.total_tokens,
710 "reasoning": output.total_reasoning_tokens,
711 })
712 .to_string();
713 span.set_attribute(
714 "langfuse.trace.metadata.usage_details",
715 usage_details.as_str(),
716 );
717}
718
719fn apply_langfuse_observation_usage_attributes(
720 span: &mut dyn WorkflowSpan,
721 usage: &YamlLlmTokenUsage,
722) {
723 let usage_details = json!({
724 "input": usage.prompt_tokens,
725 "output": usage.completion_tokens,
726 "total": usage.total_tokens,
727 "reasoning": usage.reasoning_tokens,
728 })
729 .to_string();
730 span.set_attribute("langfuse.observation.usage_details", usage_details.as_str());
731 span.set_attribute(
732 "gen_ai.usage.input_tokens",
733 usage.prompt_tokens.to_string().as_str(),
734 );
735 span.set_attribute(
736 "gen_ai.usage.output_tokens",
737 usage.completion_tokens.to_string().as_str(),
738 );
739 span.set_attribute(
740 "gen_ai.usage.total_tokens",
741 usage.total_tokens.to_string().as_str(),
742 );
743 if let Some(reasoning_tokens) = usage.reasoning_tokens {
744 span.set_attribute(
745 "gen_ai.usage.reasoning_tokens",
746 reasoning_tokens.to_string().as_str(),
747 );
748 }
749}
750
751fn payload_for_span(mode: YamlWorkflowPayloadMode, payload: &Value) -> String {
752 match mode {
753 YamlWorkflowPayloadMode::FullPayload => payload.to_string(),
754 YamlWorkflowPayloadMode::RedactedPayload => json!({
755 "redacted": true,
756 "value_type": match payload {
757 Value::Null => "null",
758 Value::Bool(_) => "bool",
759 Value::Number(_) => "number",
760 Value::String(_) => "string",
761 Value::Array(_) => "array",
762 Value::Object(_) => "object",
763 }
764 })
765 .to_string(),
766 }
767}
768
769fn payload_for_tool_trace(mode: YamlToolTraceMode, payload: &Value) -> Value {
770 match mode {
771 YamlToolTraceMode::Full => payload.clone(),
772 YamlToolTraceMode::Redacted => json!({
773 "redacted": true,
774 "value_type": json_type_name(payload),
775 }),
776 YamlToolTraceMode::Off => Value::Null,
777 }
778}
779
780fn validate_json_schema(schema: &Value) -> Result<(), String> {
781 jsonschema::JSONSchema::compile(schema)
782 .map(|_| ())
783 .map_err(|error| format!("invalid JSON schema: {error}"))
784}
785
786fn validate_schema_instance(schema: &Value, instance: &Value) -> Result<(), String> {
787 let validator = jsonschema::JSONSchema::compile(schema)
788 .map_err(|error| format!("invalid JSON schema: {error}"))?;
789 if let Err(errors) = validator.validate(instance) {
790 let message = errors
791 .into_iter()
792 .next()
793 .map(|error| error.to_string())
794 .unwrap_or_else(|| "unknown schema validation error".to_string());
795 return Err(format!("schema validation failed: {message}"));
796 }
797 Ok(())
798}
799
800fn schema_type(schema: &Value) -> Option<&str> {
801 schema.get("type").and_then(Value::as_str)
802}
803
804fn schema_expects_object(schema: &Value) -> bool {
805 schema_type(schema) == Some("object")
806}
807
808fn trace_context_from_options(options: &YamlWorkflowRunOptions) -> Option<TraceContext> {
809 options.trace.context.as_ref().map(|input| TraceContext {
810 trace_id: input.trace_id.clone(),
811 span_id: input.span_id.clone(),
812 parent_span_id: input.parent_span_id.clone(),
813 traceparent: input.traceparent.clone(),
814 tracestate: input.tracestate.clone(),
815 baggage: input.baggage.clone(),
816 })
817}
818
819fn merged_trace_context_for_worker(
820 span_context: Option<&TraceContext>,
821 resolved_trace_id: Option<&str>,
822 options: &YamlWorkflowRunOptions,
823) -> TraceContext {
824 let input_context = options.trace.context.as_ref();
825 let baggage = if let Some(context) = span_context {
826 if !context.baggage.is_empty() {
827 context.baggage.clone()
828 } else {
829 input_context
830 .map(|value| value.baggage.clone())
831 .unwrap_or_default()
832 }
833 } else {
834 input_context
835 .map(|value| value.baggage.clone())
836 .unwrap_or_default()
837 };
838
839 TraceContext {
840 trace_id: span_context
841 .and_then(|context| context.trace_id.clone())
842 .or_else(|| resolved_trace_id.map(|value| value.to_string()))
843 .or_else(|| input_context.and_then(|context| context.trace_id.clone())),
844 span_id: span_context
845 .and_then(|context| context.span_id.clone())
846 .or_else(|| input_context.and_then(|context| context.span_id.clone())),
847 parent_span_id: span_context
848 .and_then(|context| context.parent_span_id.clone())
849 .or_else(|| input_context.and_then(|context| context.parent_span_id.clone())),
850 traceparent: span_context
851 .and_then(|context| context.traceparent.clone())
852 .or_else(|| input_context.and_then(|context| context.traceparent.clone())),
853 tracestate: span_context
854 .and_then(|context| context.tracestate.clone())
855 .or_else(|| input_context.and_then(|context| context.tracestate.clone())),
856 baggage,
857 }
858}
859
860fn custom_worker_context_with_trace(
861 context: &Value,
862 trace_context: &TraceContext,
863 tenant_context: &YamlWorkflowTraceTenantContext,
864) -> Value {
865 let mut context_with_trace = context.clone();
866 let Some(root) = context_with_trace.as_object_mut() else {
867 return context_with_trace;
868 };
869
870 root.insert(
871 "trace".to_string(),
872 json!({
873 "context": {
874 "trace_id": trace_context.trace_id,
875 "span_id": trace_context.span_id,
876 "parent_span_id": trace_context.parent_span_id,
877 "traceparent": trace_context.traceparent,
878 "tracestate": trace_context.tracestate,
879 "baggage": trace_context.baggage,
880 },
881 "tenant": {
882 "workspace_id": tenant_context.workspace_id,
883 "user_id": tenant_context.user_id,
884 "conversation_id": tenant_context.conversation_id,
885 "request_id": tenant_context.request_id,
886 "run_id": tenant_context.run_id,
887 }
888 }),
889 );
890
891 context_with_trace
892}
893
894fn include_raw_stream_debug_events() -> bool {
895 match std::env::var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW") {
896 Ok(value) => {
897 let normalized = value.trim().to_ascii_lowercase();
898 normalized == "1" || normalized == "true" || normalized == "yes" || normalized == "on"
899 }
900 Err(_) => false,
901 }
902}
903
904#[derive(Debug)]
905struct StreamedPayloadResolution {
906 payload: Value,
907 heal_confidence: Option<f32>,
908}
909
910#[derive(Debug, Default)]
911struct StreamJsonAsTextFormatter {
912 raw_json: String,
913 emitted: bool,
914}
915
916impl StreamJsonAsTextFormatter {
917 fn push(&mut self, chunk: &str) {
918 self.raw_json.push_str(chunk);
919 }
920
921 fn emit_if_ready(&mut self, complete: bool) -> Option<String> {
922 if self.emitted || !complete {
923 return None;
924 }
925 self.emitted = true;
926 Some(render_json_object_as_text(self.raw_json.as_str()))
927 }
928}
929
930fn render_json_object_as_text(raw_json: &str) -> String {
931 let value = match serde_json::from_str::<Value>(raw_json) {
932 Ok(value) => value,
933 Err(_) => return raw_json.to_string(),
934 };
935 let Some(object) = value.as_object() else {
936 return raw_json.to_string();
937 };
938
939 let mut lines = Vec::with_capacity(object.len());
940 for (key, value) in object {
941 let rendered = match value {
942 Value::String(text) => text.clone(),
943 _ => value.to_string(),
944 };
945 lines.push(format!("{key}: {rendered}"));
946 }
947 lines.join("\n")
948}
949
950#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
951#[serde(rename_all = "snake_case")]
952pub enum YamlWorkflowTokenKind {
953 Output,
954 Thinking,
955}
956
957#[derive(Debug, Default)]
958struct StructuredJsonDeltaFilter {
959 started: bool,
960 completed: bool,
961 depth: u32,
962 in_string: bool,
963 escape: bool,
964}
965
966impl StructuredJsonDeltaFilter {
967 fn split(&mut self, delta: &str) -> (Option<String>, Option<String>) {
968 if delta.is_empty() {
969 return (None, None);
970 }
971
972 let mut output = String::new();
973 let mut thinking = String::new();
974
975 for ch in delta.chars() {
976 if self.completed {
977 thinking.push(ch);
978 continue;
979 }
980
981 if !self.started {
982 if ch != '{' {
983 thinking.push(ch);
984 continue;
985 }
986 self.started = true;
987 self.depth = 1;
988 output.push(ch);
989 continue;
990 }
991
992 output.push(ch);
993 if self.in_string {
994 if self.escape {
995 self.escape = false;
996 continue;
997 }
998 if ch == '\\' {
999 self.escape = true;
1000 continue;
1001 }
1002 if ch == '"' {
1003 self.in_string = false;
1004 }
1005 continue;
1006 }
1007
1008 match ch {
1009 '"' => self.in_string = true,
1010 '{' => self.depth = self.depth.saturating_add(1),
1011 '}' => {
1012 if self.depth > 0 {
1013 self.depth -= 1;
1014 }
1015 if self.depth == 0 {
1016 self.completed = true;
1017 }
1018 }
1019 _ => {}
1020 }
1021 }
1022
1023 let output = if output.is_empty() {
1024 None
1025 } else {
1026 Some(output)
1027 };
1028 let thinking = if thinking.is_empty() {
1029 None
1030 } else {
1031 Some(thinking)
1032 };
1033
1034 (output, thinking)
1035 }
1036
1037 fn completed(&self) -> bool {
1038 self.completed
1039 }
1040}
1041
1042fn extract_last_fenced_json_block(raw: &str) -> Option<&str> {
1043 let start = raw.rfind("```json")?;
1044 let remainder = &raw[start + "```json".len()..];
1045 let end = remainder.find("```")?;
1046 let candidate = remainder[..end].trim();
1047 if candidate.is_empty() {
1048 return None;
1049 }
1050 Some(candidate)
1051}
1052
1053fn extract_balanced_object_from(raw: &str, start_index: usize) -> Option<&str> {
1054 let mut depth = 0u32;
1055 let mut in_string = false;
1056 let mut escape = false;
1057
1058 for (relative_index, ch) in raw[start_index..].char_indices() {
1059 if in_string {
1060 if escape {
1061 escape = false;
1062 continue;
1063 }
1064 if ch == '\\' {
1065 escape = true;
1066 continue;
1067 }
1068 if ch == '"' {
1069 in_string = false;
1070 }
1071 continue;
1072 }
1073
1074 match ch {
1075 '"' => in_string = true,
1076 '{' => depth = depth.saturating_add(1),
1077 '}' => {
1078 if depth == 0 {
1079 return None;
1080 }
1081 depth -= 1;
1082 if depth == 0 {
1083 let end_index = start_index + relative_index + ch.len_utf8();
1084 return Some(raw[start_index..end_index].trim());
1085 }
1086 }
1087 _ => {}
1088 }
1089 }
1090
1091 None
1092}
1093
1094fn extract_last_parsable_object(raw: &str) -> Option<&str> {
1095 let starts: Vec<usize> = raw
1096 .char_indices()
1097 .filter_map(|(index, ch)| if ch == '{' { Some(index) } else { None })
1098 .collect();
1099
1100 for start in starts.into_iter().rev() {
1101 let Some(candidate) = extract_balanced_object_from(raw, start) else {
1102 continue;
1103 };
1104 if serde_json::from_str::<Value>(candidate).is_ok() {
1105 return Some(candidate);
1106 }
1107 }
1108
1109 None
1110}
1111
1112fn resolve_structured_json_candidate(raw: &str) -> Option<&str> {
1113 extract_last_fenced_json_block(raw).or_else(|| extract_last_parsable_object(raw))
1114}
1115
1116fn parse_streamed_structured_payload(
1117 raw: &str,
1118 heal: bool,
1119) -> Result<StreamedPayloadResolution, String> {
1120 if !heal {
1121 if let Ok(payload) = serde_json::from_str::<Value>(raw) {
1122 return Ok(StreamedPayloadResolution {
1123 payload,
1124 heal_confidence: None,
1125 });
1126 }
1127
1128 let candidate = resolve_structured_json_candidate(raw).ok_or_else(|| {
1129 "failed to parse streamed structured completion JSON: no JSON object candidate found"
1130 .to_string()
1131 })?;
1132 let payload = serde_json::from_str::<Value>(candidate).map_err(|error| {
1133 format!(
1134 "failed to parse streamed structured completion JSON: {error}; candidate={candidate}"
1135 )
1136 })?;
1137 return Ok(StreamedPayloadResolution {
1138 payload,
1139 heal_confidence: None,
1140 });
1141 }
1142
1143 let candidate = resolve_structured_json_candidate(raw).unwrap_or(raw);
1144 let parser = JsonishParser::new();
1145 let healed = parser
1146 .parse(candidate)
1147 .map_err(|error| format!("failed to heal streamed structured completion JSON: {error}"))?;
1148
1149 Ok(StreamedPayloadResolution {
1150 payload: healed.value,
1151 heal_confidence: Some(healed.confidence),
1152 })
1153}
1154
1155#[derive(Debug, Clone, PartialEq, Serialize)]
1156pub struct YamlWorkflowEvent {
1157 pub event_type: String,
1158 pub node_id: Option<String>,
1159 pub step_id: Option<String>,
1160 pub node_kind: Option<String>,
1161 pub streamable: Option<bool>,
1162 pub message: Option<String>,
1163 pub delta: Option<String>,
1164 pub token_kind: Option<YamlWorkflowTokenKind>,
1165 pub is_terminal_node_token: Option<bool>,
1166 pub elapsed_ms: Option<u128>,
1167 pub metadata: Option<Value>,
1168}
1169
1170pub type WorkflowMessageRole = Role;
1171
1172#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1173pub struct WorkflowMessage {
1174 pub role: WorkflowMessageRole,
1175 pub content: String,
1176 #[serde(default)]
1177 pub name: Option<String>,
1178 #[serde(default, alias = "toolCallId")]
1179 pub tool_call_id: Option<String>,
1180}
1181
1182#[derive(Debug, Clone, PartialEq, Serialize)]
1183pub struct YamlTemplateBinding {
1184 pub index: usize,
1185 pub expression: String,
1186 pub source_path: String,
1187 pub resolved: Value,
1188 pub resolved_type: String,
1189 pub missing: bool,
1190}
1191
1192#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1193pub enum YamlWorkflowDiagnosticSeverity {
1194 Error,
1195 Warning,
1196}
1197
1198#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1199pub struct YamlWorkflowDiagnostic {
1200 pub node_id: Option<String>,
1201 pub code: String,
1202 pub severity: YamlWorkflowDiagnosticSeverity,
1203 pub message: String,
1204}
1205
1206#[derive(Debug, Error)]
1207pub enum YamlWorkflowRunError {
1208 #[error("failed to read workflow yaml '{path}': {source}")]
1209 Read {
1210 path: String,
1211 source: std::io::Error,
1212 },
1213 #[error("failed to parse workflow yaml '{path}': {source}")]
1214 Parse {
1215 path: String,
1216 source: serde_yaml::Error,
1217 },
1218 #[error("rejected workflow yaml '{path}': {reason}")]
1219 FileRejected { path: String, reason: String },
1220 #[error("workflow '{workflow_id}' has no nodes")]
1221 EmptyNodes { workflow_id: String },
1222 #[error("entry node '{entry_node}' does not exist")]
1223 MissingEntry { entry_node: String },
1224 #[error("unknown node id '{node_id}'")]
1225 MissingNode { node_id: String },
1226 #[error("unsupported node type in '{node_id}'")]
1227 UnsupportedNodeType { node_id: String },
1228 #[error("unsupported switch condition format: {condition}")]
1229 UnsupportedCondition { condition: String },
1230 #[error("switch node '{node_id}' has no valid next target")]
1231 InvalidSwitchTarget { node_id: String },
1232 #[error("llm returned non-object payload for node '{node_id}'")]
1233 LlmPayloadNotObject { node_id: String },
1234 #[error("custom worker handler '{handler}' is not supported")]
1235 UnsupportedCustomHandler { handler: String },
1236 #[error("llm execution failed for node '{node_id}': {message}")]
1237 Llm { node_id: String, message: String },
1238 #[error("custom worker execution failed for node '{node_id}': {message}")]
1239 CustomWorker { node_id: String, message: String },
1240 #[error("workflow validation failed with {diagnostics_count} error(s)")]
1241 Validation {
1242 diagnostics_count: usize,
1243 diagnostics: Vec<YamlWorkflowDiagnostic>,
1244 },
1245 #[error("invalid workflow input: {message}")]
1246 InvalidInput { message: String },
1247 #[error("ir runtime execution failed: {message}")]
1248 IrRuntime { message: String },
1249 #[error("workflow event stream cancelled: {message}")]
1250 EventSinkCancelled { message: String },
1251}
1252
1253pub trait YamlWorkflowEventSink: Send + Sync {
1254 fn emit(&self, event: &YamlWorkflowEvent);
1255
1256 fn is_cancelled(&self) -> bool {
1257 false
1258 }
1259}
1260
1261pub struct NoopYamlWorkflowEventSink;
1262
1263impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
1264 fn emit(&self, _event: &YamlWorkflowEvent) {}
1265}
1266
1267fn workflow_event_sink_cancelled_message() -> &'static str {
1268 "workflow event callback cancelled"
1269}
1270
1271fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
1272 event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
1273}
1274
1275#[derive(Debug, Clone, PartialEq, Eq, Error)]
1276pub enum YamlToIrError {
1277 #[error("entry node '{entry_node}' does not exist")]
1278 MissingEntry { entry_node: String },
1279 #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
1280 MultipleOutgoingEdge { node_id: String },
1281 #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
1282 UnsupportedNode { node_id: String, reason: String },
1283}
1284
1285pub fn yaml_workflow_to_mermaid(workflow: &YamlWorkflow) -> String {
1287 if let Ok(ir) = yaml_workflow_to_ir(workflow) {
1288 return workflow_to_mermaid(&ir);
1289 }
1290
1291 yaml_workflow_to_mermaid_fallback(workflow)
1292}
1293
1294fn yaml_workflow_to_mermaid_fallback(workflow: &YamlWorkflow) -> String {
1295 let mut lines = Vec::new();
1296 lines.push("flowchart TD".to_string());
1297 let mut tool_node_ids: Vec<String> = Vec::new();
1298
1299 for node in &workflow.nodes {
1300 let label = format!("{}\\n({})", node.id, node.kind_name());
1301 lines.push(format!(
1302 " {}[\"{}\"]",
1303 sanitize_mermaid_id(&node.id),
1304 escape_mermaid_label(label.as_str()),
1305 ));
1306
1307 if let Some(llm) = node.node_type.llm_call.as_ref() {
1308 for (idx, tool_name) in llm_tool_names(llm).into_iter().enumerate() {
1309 let tool_id = sanitize_mermaid_id(format!("{}__tool_{}", node.id, idx).as_str());
1310 lines.push(format!(
1311 " {}([\"{}\"])",
1312 tool_id,
1313 escape_mermaid_label(format!("tool: {tool_name}").as_str())
1314 ));
1315 lines.push(format!(
1316 " {} -.-> {}",
1317 sanitize_mermaid_id(&node.id),
1318 tool_id
1319 ));
1320 tool_node_ids.push(tool_id);
1321 }
1322 }
1323 }
1324
1325 let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1326
1327 for edge in &workflow.edges {
1328 emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1329 }
1330
1331 for node in &workflow.nodes {
1332 if let Some(switch) = node.node_type.switch.as_ref() {
1333 for branch in &switch.branches {
1334 emitted.insert((
1335 node.id.clone(),
1336 branch.condition.clone(),
1337 branch.target.clone(),
1338 ));
1339 }
1340 emitted.insert((
1341 node.id.clone(),
1342 "default".to_string(),
1343 switch.default.clone(),
1344 ));
1345 }
1346 }
1347
1348 let mut edges = emitted.into_iter().collect::<Vec<_>>();
1349 edges.sort();
1350
1351 for (from, label, to) in edges {
1352 if label.is_empty() {
1353 lines.push(format!(
1354 " {} --> {}",
1355 sanitize_mermaid_id(&from),
1356 sanitize_mermaid_id(&to)
1357 ));
1358 } else {
1359 lines.push(format!(
1360 " {} -- \"{}\" --> {}",
1361 sanitize_mermaid_id(&from),
1362 escape_mermaid_label(&label),
1363 sanitize_mermaid_id(&to)
1364 ));
1365 }
1366 }
1367
1368 if !tool_node_ids.is_empty() {
1369 lines.push(" classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1370 lines.push(format!(" class {} toolNode;", tool_node_ids.join(",")));
1371 }
1372
1373 lines.join("\n")
1374}
1375
1376fn llm_tool_names(llm: &YamlLlmCall) -> Vec<String> {
1377 llm.tools
1378 .iter()
1379 .map(|tool| match tool {
1380 YamlToolDeclaration::OpenAi(openai) => openai.function.name.clone(),
1381 YamlToolDeclaration::Simplified(simple) => simple.name.clone(),
1382 })
1383 .collect()
1384}
1385
1386pub fn yaml_workflow_file_to_mermaid(workflow_path: &Path) -> Result<String, YamlWorkflowRunError> {
1388 let (canonical_path, workflow) = load_workflow_yaml_file(workflow_path)?;
1389
1390 let referenced_subgraphs = discover_referenced_subgraphs(canonical_path.as_path(), &workflow)?;
1391 if referenced_subgraphs.is_empty() {
1392 return Ok(yaml_workflow_to_mermaid(&workflow));
1393 }
1394
1395 Ok(yaml_workflow_to_mermaid_with_subgraphs(
1396 &workflow,
1397 &referenced_subgraphs,
1398 ))
1399}
1400
1401#[derive(Debug, Clone)]
1402struct MermaidSubgraphWorkflow {
1403 alias: String,
1404 workflow: YamlWorkflow,
1405}
1406
1407#[derive(Debug, Default)]
1408struct MermaidBlockRender {
1409 lines: Vec<String>,
1410 tool_node_ids: Vec<String>,
1411 run_workflow_tool_node_ids: Vec<String>,
1412 entry_node_id: String,
1413}
1414
1415fn yaml_workflow_to_mermaid_with_subgraphs(
1416 workflow: &YamlWorkflow,
1417 subgraphs: &[MermaidSubgraphWorkflow],
1418) -> String {
1419 let mut lines = vec!["flowchart TD".to_string()];
1420
1421 let main_block = render_mermaid_block(workflow, "main");
1422 lines.push(format!(
1423 " subgraph main_graph[\"Main: {}\"]",
1424 escape_mermaid_label(&workflow.id)
1425 ));
1426 for line in &main_block.lines {
1427 lines.push(format!(" {line}"));
1428 }
1429 lines.push(" end".to_string());
1430
1431 let mut all_tool_nodes = main_block.tool_node_ids.clone();
1432
1433 for (index, subgraph) in subgraphs.iter().enumerate() {
1434 let block_id = format!("subgraph_{}", index + 1);
1435 let block = render_mermaid_block(&subgraph.workflow, &block_id);
1436 lines.push(format!(
1437 " subgraph {}[\"Subgraph: {}\"]",
1438 sanitize_mermaid_id(&format!("{}_cluster", block_id)),
1439 escape_mermaid_label(&subgraph.alias)
1440 ));
1441 for line in &block.lines {
1442 lines.push(format!(" {line}"));
1443 }
1444 lines.push(" end".to_string());
1445
1446 for tool_node in &main_block.run_workflow_tool_node_ids {
1447 lines.push(format!(
1448 " {} -. \"{}\" .-> {}",
1449 tool_node,
1450 escape_mermaid_label(&format!("calls {}", subgraph.alias)),
1451 block.entry_node_id
1452 ));
1453 }
1454
1455 all_tool_nodes.extend(block.tool_node_ids);
1456 }
1457
1458 if !all_tool_nodes.is_empty() {
1459 lines.push(" classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1460 lines.push(format!(" class {} toolNode;", all_tool_nodes.join(",")));
1461 }
1462
1463 lines.join("\n")
1464}
1465
1466fn render_mermaid_block(workflow: &YamlWorkflow, prefix: &str) -> MermaidBlockRender {
1467 let mut block = MermaidBlockRender {
1468 entry_node_id: prefixed_mermaid_id(prefix, &workflow.entry_node),
1469 ..Default::default()
1470 };
1471
1472 for node in &workflow.nodes {
1473 let node_id = prefixed_mermaid_id(prefix, &node.id);
1474 let label = format!("{}\\n({})", node.id, node.kind_name());
1475 block
1476 .lines
1477 .push(format!("{}[\"{}\"]", node_id, escape_mermaid_label(&label)));
1478
1479 if let Some(llm) = node.node_type.llm_call.as_ref() {
1480 for (idx, tool) in llm.tools.iter().enumerate() {
1481 let tool_name = tool_declaration_name(tool);
1482 let tool_id = prefixed_mermaid_id(prefix, &format!("{}__tool_{}", node.id, idx));
1483 block.lines.push(format!(
1484 "{}([\"{}\"])",
1485 tool_id,
1486 escape_mermaid_label(format!("tool: {tool_name}").as_str())
1487 ));
1488 block.lines.push(format!("{} -.-> {}", node_id, tool_id));
1489 block.tool_node_ids.push(tool_id.clone());
1490
1491 if tool_name == "run_workflow_graph" {
1492 block.run_workflow_tool_node_ids.push(tool_id);
1493 }
1494 }
1495 }
1496 }
1497
1498 let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1499
1500 for edge in &workflow.edges {
1501 emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1502 }
1503
1504 for node in &workflow.nodes {
1505 if let Some(switch) = node.node_type.switch.as_ref() {
1506 for branch in &switch.branches {
1507 emitted.insert((
1508 node.id.clone(),
1509 branch.condition.clone(),
1510 branch.target.clone(),
1511 ));
1512 }
1513 emitted.insert((
1514 node.id.clone(),
1515 "default".to_string(),
1516 switch.default.clone(),
1517 ));
1518 }
1519 }
1520
1521 let mut edges = emitted.into_iter().collect::<Vec<_>>();
1522 edges.sort();
1523
1524 for (from, label, to) in edges {
1525 let from_id = prefixed_mermaid_id(prefix, &from);
1526 let to_id = prefixed_mermaid_id(prefix, &to);
1527 if label.is_empty() {
1528 block.lines.push(format!("{} --> {}", from_id, to_id));
1529 } else {
1530 block.lines.push(format!(
1531 "{} -- \"{}\" --> {}",
1532 from_id,
1533 escape_mermaid_label(&label),
1534 to_id
1535 ));
1536 }
1537 }
1538
1539 block
1540}
1541
1542fn discover_referenced_subgraphs(
1543 workflow_path: &Path,
1544 workflow: &YamlWorkflow,
1545) -> Result<Vec<MermaidSubgraphWorkflow>, YamlWorkflowRunError> {
1546 let workflow_ids = referenced_workflow_ids(workflow);
1547 if workflow_ids.is_empty() {
1548 return Ok(Vec::new());
1549 }
1550
1551 let parent_dir = workflow_path.parent().unwrap_or(Path::new("."));
1552 let sibling_workflows = load_yaml_sibling_workflows(parent_dir, workflow_path)?;
1553
1554 let mut discovered = Vec::new();
1555 let mut seen = HashSet::new();
1556
1557 for workflow_id in workflow_ids {
1558 let normalized = normalize_workflow_lookup_key(&workflow_id);
1559 if seen.contains(&normalized) {
1560 continue;
1561 }
1562
1563 if let Some((_, subworkflow)) = sibling_workflows.iter().find(|(key, _)| key == &normalized)
1564 {
1565 discovered.push(MermaidSubgraphWorkflow {
1566 alias: workflow_id.clone(),
1567 workflow: subworkflow.clone(),
1568 });
1569 seen.insert(normalized);
1570 }
1571 }
1572
1573 Ok(discovered)
1574}
1575
1576fn load_yaml_sibling_workflows(
1577 parent_dir: &Path,
1578 workflow_path: &Path,
1579) -> Result<Vec<(String, YamlWorkflow)>, YamlWorkflowRunError> {
1580 let mut results = Vec::new();
1581 let canonical_target =
1582 std::fs::canonicalize(workflow_path).unwrap_or_else(|_| workflow_path.to_path_buf());
1583 let entries = std::fs::read_dir(parent_dir).map_err(|source| YamlWorkflowRunError::Read {
1584 path: parent_dir.display().to_string(),
1585 source,
1586 })?;
1587
1588 for entry in entries {
1589 let entry = entry.map_err(|source| YamlWorkflowRunError::Read {
1590 path: parent_dir.display().to_string(),
1591 source,
1592 })?;
1593 let path = entry.path();
1594 if !is_yaml_file(&path) {
1595 continue;
1596 }
1597
1598 let canonical_path = std::fs::canonicalize(&path).unwrap_or(path.clone());
1599 if canonical_path == canonical_target {
1600 continue;
1601 }
1602
1603 let (_, subworkflow) = load_workflow_yaml_file(path.as_path())?;
1604
1605 if let Some(stem) = path.file_stem().and_then(|value| value.to_str()) {
1606 results.push((normalize_workflow_lookup_key(stem), subworkflow.clone()));
1607 }
1608 results.push((normalize_workflow_lookup_key(&subworkflow.id), subworkflow));
1609 }
1610
1611 Ok(results)
1612}
1613
1614fn referenced_workflow_ids(workflow: &YamlWorkflow) -> Vec<String> {
1615 let mut ids = Vec::new();
1616 let mut seen = HashSet::new();
1617
1618 for node in &workflow.nodes {
1619 let prompt = node
1620 .config
1621 .as_ref()
1622 .and_then(|config| config.prompt.as_deref());
1623
1624 if let Some(llm) = node.node_type.llm_call.as_ref() {
1625 for tool in &llm.tools {
1626 if tool_declaration_name(tool) != "run_workflow_graph" {
1627 continue;
1628 }
1629
1630 for workflow_id in referenced_workflow_ids_from_tool(tool) {
1631 let normalized = normalize_workflow_lookup_key(&workflow_id);
1632 if seen.insert(normalized) {
1633 ids.push(workflow_id);
1634 }
1635 }
1636
1637 if let Some(prompt_text) = prompt {
1638 for workflow_id in referenced_workflow_ids_from_prompt(prompt_text) {
1639 let normalized = normalize_workflow_lookup_key(&workflow_id);
1640 if seen.insert(normalized) {
1641 ids.push(workflow_id);
1642 }
1643 }
1644 }
1645 }
1646 }
1647 }
1648
1649 ids
1650}
1651
1652fn referenced_workflow_ids_from_tool(tool: &YamlToolDeclaration) -> Vec<String> {
1653 let schema = match tool {
1654 YamlToolDeclaration::OpenAi(openai) => openai.function.parameters.as_ref(),
1655 YamlToolDeclaration::Simplified(simple) => Some(&simple.input_schema),
1656 };
1657
1658 let mut ids = Vec::new();
1659 if let Some(schema) = schema {
1660 if let Some(workflow_prop) = schema
1661 .get("properties")
1662 .and_then(Value::as_object)
1663 .and_then(|properties| properties.get("workflow_id"))
1664 {
1665 if let Some(value) = workflow_prop.get("const").and_then(Value::as_str) {
1666 ids.push(value.to_string());
1667 }
1668
1669 if let Some(enum_values) = workflow_prop.get("enum").and_then(Value::as_array) {
1670 for value in enum_values {
1671 if let Some(value) = value.as_str() {
1672 ids.push(value.to_string());
1673 }
1674 }
1675 }
1676 }
1677 }
1678
1679 ids
1680}
1681
1682fn referenced_workflow_ids_from_prompt(prompt: &str) -> Vec<String> {
1683 let mut ids = Vec::new();
1684 let mut search = prompt;
1685
1686 while let Some(index) = search.find("\"workflow_id\"") {
1687 let remainder = &search[index + "\"workflow_id\"".len()..];
1688 let Some(colon_index) = remainder.find(':') else {
1689 break;
1690 };
1691
1692 let candidate = remainder[colon_index + 1..].trim_start();
1693 if let Some(rest) = candidate.strip_prefix('"') {
1694 if let Some(end_quote_index) = rest.find('"') {
1695 let workflow_id = rest[..end_quote_index].trim();
1696 if !workflow_id.is_empty() {
1697 ids.push(workflow_id.to_string());
1698 }
1699 search = &rest[end_quote_index + 1..];
1700 continue;
1701 }
1702 }
1703
1704 search = &remainder[colon_index + 1..];
1705 }
1706
1707 ids
1708}
1709
1710fn normalize_workflow_lookup_key(value: &str) -> String {
1711 value
1712 .chars()
1713 .map(|ch| match ch {
1714 '-' => '_',
1715 _ => ch.to_ascii_lowercase(),
1716 })
1717 .collect()
1718}
1719
1720fn is_yaml_file(path: &Path) -> bool {
1721 matches!(
1722 path.extension().and_then(|ext| ext.to_str()),
1723 Some("yaml") | Some("yml")
1724 )
1725}
1726
1727fn yaml_value_depth(value: &serde_yaml::Value, depth: usize) -> usize {
1728 match value {
1729 serde_yaml::Value::Sequence(items) => items
1730 .iter()
1731 .map(|item| yaml_value_depth(item, depth + 1))
1732 .max()
1733 .unwrap_or(depth),
1734 serde_yaml::Value::Mapping(map) => map
1735 .values()
1736 .map(|item| yaml_value_depth(item, depth + 1))
1737 .max()
1738 .unwrap_or(depth),
1739 _ => depth,
1740 }
1741}
1742
1743fn load_workflow_yaml_file(
1744 workflow_path: &Path,
1745) -> Result<(PathBuf, YamlWorkflow), YamlWorkflowRunError> {
1746 let canonical_path =
1747 std::fs::canonicalize(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1748 path: workflow_path.display().to_string(),
1749 source,
1750 })?;
1751
1752 let metadata =
1753 std::fs::metadata(&canonical_path).map_err(|source| YamlWorkflowRunError::Read {
1754 path: canonical_path.display().to_string(),
1755 source,
1756 })?;
1757
1758 if !metadata.is_file() {
1759 return Err(YamlWorkflowRunError::FileRejected {
1760 path: canonical_path.display().to_string(),
1761 reason: "path must reference a regular file".to_string(),
1762 });
1763 }
1764
1765 if !is_yaml_file(&canonical_path) {
1766 return Err(YamlWorkflowRunError::FileRejected {
1767 path: canonical_path.display().to_string(),
1768 reason: "workflow file extension must be .yaml or .yml".to_string(),
1769 });
1770 }
1771
1772 if metadata.len() > MAX_WORKFLOW_YAML_BYTES {
1773 return Err(YamlWorkflowRunError::FileRejected {
1774 path: canonical_path.display().to_string(),
1775 reason: format!(
1776 "workflow yaml is too large ({} bytes > {} bytes)",
1777 metadata.len(),
1778 MAX_WORKFLOW_YAML_BYTES
1779 ),
1780 });
1781 }
1782
1783 let contents =
1784 std::fs::read_to_string(&canonical_path).map_err(|source| YamlWorkflowRunError::Read {
1785 path: canonical_path.display().to_string(),
1786 source,
1787 })?;
1788
1789 let yaml_value: serde_yaml::Value =
1790 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1791 path: canonical_path.display().to_string(),
1792 source,
1793 })?;
1794
1795 let depth = yaml_value_depth(&yaml_value, 1);
1796 if depth > MAX_WORKFLOW_YAML_DEPTH {
1797 return Err(YamlWorkflowRunError::FileRejected {
1798 path: canonical_path.display().to_string(),
1799 reason: format!(
1800 "workflow yaml nesting depth {} exceeds limit {}",
1801 depth, MAX_WORKFLOW_YAML_DEPTH
1802 ),
1803 });
1804 }
1805
1806 let workflow: YamlWorkflow =
1807 serde_yaml::from_value(yaml_value).map_err(|source| YamlWorkflowRunError::Parse {
1808 path: canonical_path.display().to_string(),
1809 source,
1810 })?;
1811
1812 Ok((canonical_path, workflow))
1813}
1814
1815fn prefixed_mermaid_id(prefix: &str, id: &str) -> String {
1816 sanitize_mermaid_id(format!("{}__{}", prefix, id).as_str())
1817}
1818
1819fn tool_declaration_name(tool: &YamlToolDeclaration) -> &str {
1820 match tool {
1821 YamlToolDeclaration::OpenAi(openai) => &openai.function.name,
1822 YamlToolDeclaration::Simplified(simple) => &simple.name,
1823 }
1824}
1825
1826pub fn yaml_workflow_to_ir(workflow: &YamlWorkflow) -> Result<WorkflowDefinition, YamlToIrError> {
1827 let known_ids: HashSet<&str> = workflow.nodes.iter().map(|n| n.id.as_str()).collect();
1828 if !known_ids.contains(workflow.entry_node.as_str()) {
1829 return Err(YamlToIrError::MissingEntry {
1830 entry_node: workflow.entry_node.clone(),
1831 });
1832 }
1833
1834 let mut outgoing: HashMap<&str, Vec<&str>> = HashMap::new();
1835 for edge in &workflow.edges {
1836 outgoing
1837 .entry(edge.from.as_str())
1838 .or_default()
1839 .push(edge.to.as_str());
1840 }
1841
1842 let mut nodes = Vec::with_capacity(workflow.nodes.len() + 1);
1843 nodes.push(Node {
1844 id: YAML_START_NODE_ID.to_string(),
1845 kind: NodeKind::Start {
1846 next: workflow.entry_node.clone(),
1847 },
1848 });
1849
1850 for node in &workflow.nodes {
1851 if let Some(llm) = node.node_type.llm_call.as_ref() {
1852 if node
1853 .config
1854 .as_ref()
1855 .and_then(|c| c.set_globals.as_ref())
1856 .is_some()
1857 || node
1858 .config
1859 .as_ref()
1860 .and_then(|c| c.update_globals.as_ref())
1861 .is_some()
1862 {
1863 return Err(YamlToIrError::UnsupportedNode {
1864 node_id: node.id.clone(),
1865 reason: "set_globals/update_globals are not represented in canonical IR llm nodes yet"
1866 .to_string(),
1867 });
1868 }
1869
1870 if !llm.tools.is_empty() {
1871 return Err(YamlToIrError::UnsupportedNode {
1872 node_id: node.id.clone(),
1873 reason: "llm_call.tools are not represented in canonical IR llm nodes yet"
1874 .to_string(),
1875 });
1876 }
1877
1878 let next = single_next_for_node(&outgoing, &node.id)?;
1879 nodes.push(Node {
1880 id: node.id.clone(),
1881 kind: NodeKind::Tool {
1882 tool: YAML_LLM_TOOL_ID.to_string(),
1883 input: json!({
1884 "node_id": node.id,
1885 "model": llm.model,
1886 "prompt_template": node
1887 .config
1888 .as_ref()
1889 .and_then(|c| c.prompt.clone())
1890 .unwrap_or_default(),
1891 "stream": llm.stream.unwrap_or(false),
1892 "stream_json_as_text": llm.stream_json_as_text.unwrap_or(false),
1893 "heal": llm.heal.unwrap_or(false),
1894 "messages_path": llm.messages_path,
1895 "append_prompt_as_user": llm.append_prompt_as_user.unwrap_or(true),
1896 "output_schema": node
1897 .config
1898 .as_ref()
1899 .and_then(|c| c.output_schema.clone())
1900 .unwrap_or_else(default_llm_output_schema),
1901 }),
1902 next,
1903 },
1904 });
1905 continue;
1906 }
1907
1908 if let Some(worker) = node.node_type.custom_worker.as_ref() {
1909 if node
1910 .config
1911 .as_ref()
1912 .and_then(|c| c.set_globals.as_ref())
1913 .is_some()
1914 || node
1915 .config
1916 .as_ref()
1917 .and_then(|c| c.update_globals.as_ref())
1918 .is_some()
1919 {
1920 return Err(YamlToIrError::UnsupportedNode {
1921 node_id: node.id.clone(),
1922 reason: "set_globals/update_globals are not represented in canonical IR tool nodes yet"
1923 .to_string(),
1924 });
1925 }
1926
1927 let next = single_next_for_node(&outgoing, &node.id)?;
1928 nodes.push(Node {
1929 id: node.id.clone(),
1930 kind: NodeKind::Tool {
1931 tool: worker.handler.clone(),
1932 input: node
1933 .config
1934 .as_ref()
1935 .and_then(|c| c.payload.clone())
1936 .unwrap_or_else(|| json!({})),
1937 next,
1938 },
1939 });
1940 continue;
1941 }
1942
1943 if let Some(switch) = node.node_type.switch.as_ref() {
1944 nodes.push(Node {
1945 id: node.id.clone(),
1946 kind: NodeKind::Router {
1947 routes: switch
1948 .branches
1949 .iter()
1950 .map(|b| RouterRoute {
1951 when: rewrite_yaml_condition_to_ir(&b.condition),
1952 next: b.target.clone(),
1953 })
1954 .collect(),
1955 default: switch.default.clone(),
1956 },
1957 });
1958 continue;
1959 }
1960
1961 return Err(YamlToIrError::UnsupportedNode {
1962 node_id: node.id.clone(),
1963 reason: "node_type must be llm_call, switch, or custom_worker".to_string(),
1964 });
1965 }
1966
1967 Ok(WorkflowDefinition {
1968 version: WORKFLOW_IR_V0.to_string(),
1969 name: workflow.id.clone(),
1970 nodes,
1971 })
1972}
1973
1974fn single_next_for_node(
1975 outgoing: &HashMap<&str, Vec<&str>>,
1976 node_id: &str,
1977) -> Result<Option<String>, YamlToIrError> {
1978 match outgoing.get(node_id) {
1979 None => Ok(None),
1980 Some(targets) if targets.len() == 1 => Ok(Some(targets[0].to_string())),
1981 Some(_) => Err(YamlToIrError::MultipleOutgoingEdge {
1982 node_id: node_id.to_string(),
1983 }),
1984 }
1985}
1986
1987fn rewrite_yaml_condition_to_ir(expr: &str) -> String {
1988 let rewritten = expr
1989 .replace("$.nodes.", "$.node_outputs.")
1990 .replace(".output.", ".");
1991 if let Some(prefix) = rewritten.strip_suffix(".output") {
1992 prefix.to_string()
1993 } else {
1994 rewritten
1995 }
1996}
1997
1998fn sanitize_mermaid_id(id: &str) -> String {
1999 let mut out = String::with_capacity(id.len() + 1);
2000 if id
2001 .chars()
2002 .next()
2003 .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_')
2004 {
2005 out.push_str(id);
2006 } else {
2007 out.push('n');
2008 out.push('_');
2009 out.push_str(id);
2010 }
2011 out.chars()
2012 .map(|ch| {
2013 if ch.is_ascii_alphanumeric() || ch == '_' {
2014 ch
2015 } else {
2016 '_'
2017 }
2018 })
2019 .collect()
2020}
2021
2022fn escape_mermaid_label(label: &str) -> String {
2023 label.replace('"', "\\\"")
2024}
2025
2026#[derive(Debug, Clone)]
2027pub struct YamlLlmExecutionRequest {
2028 pub node_id: String,
2029 pub is_terminal_node: bool,
2030 pub stream_json_as_text: bool,
2031 pub model: String,
2032 pub messages: Option<Vec<Message>>,
2033 pub append_prompt_as_user: bool,
2034 pub prompt: String,
2035 pub prompt_template: String,
2036 pub prompt_bindings: Vec<YamlTemplateBinding>,
2037 pub schema: Value,
2038 pub stream: bool,
2039 pub heal: bool,
2040 pub tools: Vec<YamlResolvedTool>,
2041 pub tool_choice: Option<ToolChoice>,
2042 pub max_tool_roundtrips: u8,
2043 pub tool_calls_global_key: Option<String>,
2044 pub tool_trace_mode: YamlToolTraceMode,
2045 pub execution_context: Value,
2046 pub email_text: String,
2047 pub trace_id: Option<String>,
2048 pub trace_context: Option<TraceContext>,
2049 pub tenant_context: YamlWorkflowTraceTenantContext,
2050 pub trace_sampled: bool,
2051}
2052
2053#[derive(Debug, Clone)]
2054pub struct YamlResolvedTool {
2055 pub definition: ToolDefinition,
2056 pub output_schema: Option<Value>,
2057}
2058
2059#[async_trait]
2060pub trait YamlWorkflowLlmExecutor: Send + Sync {
2061 async fn complete_structured(
2062 &self,
2063 request: YamlLlmExecutionRequest,
2064 event_sink: Option<&dyn YamlWorkflowEventSink>,
2065 ) -> Result<YamlLlmExecutionResult, String>;
2066}
2067
2068#[async_trait]
2069pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
2070 async fn execute(
2071 &self,
2072 handler: &str,
2073 payload: &Value,
2074 email_text: &str,
2075 context: &Value,
2076 ) -> Result<Value, String>;
2077}
2078
2079pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2080 workflow: &YamlWorkflow,
2081 workflow_input: &Value,
2082 client: &SimpleAgentsClient,
2083 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2084 event_sink: Option<&dyn YamlWorkflowEventSink>,
2085 options: &YamlWorkflowRunOptions,
2086) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2087 let executor = BorrowedClientExecutor {
2088 client,
2089 custom_worker,
2090 run_options: options.clone(),
2091 };
2092 run_workflow_yaml_with_custom_worker_and_events_and_options(
2093 workflow,
2094 workflow_input,
2095 &executor,
2096 custom_worker,
2097 event_sink,
2098 options,
2099 )
2100 .await
2101}
2102
2103pub async fn run_workflow_yaml_with_custom_worker_and_events_and_options(
2104 workflow: &YamlWorkflow,
2105 workflow_input: &Value,
2106 executor: &dyn YamlWorkflowLlmExecutor,
2107 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2108 event_sink: Option<&dyn YamlWorkflowEventSink>,
2109 options: &YamlWorkflowRunOptions,
2110) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2111 execute::run_workflow_yaml_with_custom_worker_and_events_and_options_impl(
2112 workflow,
2113 workflow_input,
2114 executor,
2115 custom_worker,
2116 event_sink,
2117 options,
2118 )
2119 .await
2120}
2121
2122async fn try_run_yaml_via_ir_runtime(
2123 workflow: &YamlWorkflow,
2124 workflow_input: &Value,
2125 executor: &dyn YamlWorkflowLlmExecutor,
2126 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2127 options: &YamlWorkflowRunOptions,
2128) -> Result<Option<YamlWorkflowRunOutput>, YamlWorkflowRunError> {
2129 let ir = match yaml_workflow_to_ir(workflow) {
2130 Ok(def) => def,
2131 Err(YamlToIrError::UnsupportedNode { .. })
2132 | Err(YamlToIrError::MultipleOutgoingEdge { .. }) => return Ok(None),
2133 Err(err) => {
2134 return Err(YamlWorkflowRunError::InvalidInput {
2135 message: err.to_string(),
2136 });
2137 }
2138 };
2139
2140 if validate_and_normalize(&ir).is_err() {
2141 return Ok(None);
2142 }
2143
2144 let parent_trace_context = trace_context_from_options(options);
2145 let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
2146
2147 let tracer = workflow_tracer();
2148 let mut workflow_span_context: Option<TraceContext> = None;
2149 let mut workflow_span = if telemetry_context.sampled {
2150 let (span_context, mut span) = tracer.start_span(
2151 "workflow.run",
2152 SpanKind::Workflow,
2153 parent_trace_context.as_ref(),
2154 );
2155 apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
2156 apply_trace_tenant_attributes(span.as_mut(), options);
2157 workflow_span_context = Some(span_context);
2158 Some(span)
2159 } else {
2160 None
2161 };
2162
2163 struct NoopLlm;
2164 #[async_trait]
2165 impl LlmExecutor for NoopLlm {
2166 async fn execute(
2167 &self,
2168 _input: LlmExecutionInput,
2169 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
2170 Err(LlmExecutionError::UnexpectedOutcome(
2171 "yaml_ir_uses_tool_path",
2172 ))
2173 }
2174 }
2175
2176 struct YamlIrToolExecutor<'a> {
2177 llm_executor: &'a dyn YamlWorkflowLlmExecutor,
2178 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
2179 token_totals: std::sync::Mutex<YamlTokenTotals>,
2180 node_usage: std::sync::Mutex<BTreeMap<String, YamlLlmTokenUsage>>,
2181 node_models: std::sync::Mutex<BTreeMap<String, String>>,
2182 model_override: Option<String>,
2183 trace_id: Option<String>,
2184 trace_context: Option<TraceContext>,
2185 trace_input_context: Option<YamlWorkflowTraceContextInput>,
2186 tenant_context: YamlWorkflowTraceTenantContext,
2187 payload_mode: YamlWorkflowPayloadMode,
2188 trace_sampled: bool,
2189 }
2190
2191 #[async_trait]
2192 impl ToolExecutor for YamlIrToolExecutor<'_> {
2193 async fn execute_tool(
2194 &self,
2195 input: ToolExecutionInput,
2196 ) -> Result<Value, ToolExecutionError> {
2197 let context = build_yaml_context_from_ir_scope(&input.scoped_input);
2198
2199 if input.tool == YAML_LLM_TOOL_ID {
2200 let node_id = input
2201 .input
2202 .get("node_id")
2203 .and_then(Value::as_str)
2204 .ok_or_else(|| {
2205 ToolExecutionError::Failed("yaml llm call missing node_id".to_string())
2206 })?
2207 .to_string();
2208 let node_id_for_metrics = node_id.clone();
2209 let model = input
2210 .input
2211 .get("model")
2212 .and_then(Value::as_str)
2213 .ok_or_else(|| {
2214 ToolExecutionError::Failed("yaml llm call missing model".to_string())
2215 })?
2216 .to_string();
2217 let resolved_model =
2218 resolve_requested_model(self.model_override.as_deref(), &model);
2219 let prompt_template = input
2220 .input
2221 .get("prompt_template")
2222 .and_then(Value::as_str)
2223 .unwrap_or_default()
2224 .to_string();
2225 let stream = input
2226 .input
2227 .get("stream")
2228 .and_then(Value::as_bool)
2229 .unwrap_or(false);
2230 let heal = input
2231 .input
2232 .get("heal")
2233 .and_then(Value::as_bool)
2234 .unwrap_or(false);
2235 let append_prompt_as_user = input
2236 .input
2237 .get("append_prompt_as_user")
2238 .and_then(Value::as_bool)
2239 .unwrap_or(true);
2240 let messages_path = input
2241 .input
2242 .get("messages_path")
2243 .and_then(Value::as_str)
2244 .map(str::to_string);
2245
2246 let messages = if let Some(path) = messages_path.as_deref() {
2247 Some(
2248 parse_messages_from_context(path, &context)
2249 .map_err(ToolExecutionError::Failed)?,
2250 )
2251 } else {
2252 None
2253 };
2254
2255 let prompt_bindings = collect_template_bindings(&prompt_template, &context);
2256 let prompt = interpolate_template(&prompt_template, &context);
2257 let email_text = context
2258 .get("input")
2259 .and_then(|v| v.get("email_text"))
2260 .and_then(Value::as_str)
2261 .unwrap_or_default();
2262 let schema = input
2263 .input
2264 .get("output_schema")
2265 .cloned()
2266 .unwrap_or_else(default_llm_output_schema);
2267
2268 let request = YamlLlmExecutionRequest {
2269 node_id,
2270 is_terminal_node: false,
2271 stream_json_as_text: input
2272 .input
2273 .get("stream_json_as_text")
2274 .and_then(Value::as_bool)
2275 .unwrap_or(false),
2276 model: resolved_model.clone(),
2277 messages,
2278 append_prompt_as_user,
2279 prompt,
2280 prompt_template,
2281 prompt_bindings,
2282 schema,
2283 stream,
2284 heal,
2285 tools: Vec::new(),
2286 tool_choice: None,
2287 max_tool_roundtrips: 1,
2288 tool_calls_global_key: None,
2289 tool_trace_mode: YamlToolTraceMode::Off,
2290 execution_context: context.clone(),
2291 email_text: email_text.to_string(),
2292 trace_id: self.trace_id.clone(),
2293 trace_context: self.trace_context.clone(),
2294 tenant_context: self.tenant_context.clone(),
2295 trace_sampled: self.trace_sampled,
2296 };
2297
2298 let llm_result = self
2299 .llm_executor
2300 .complete_structured(request, None)
2301 .await
2302 .map_err(ToolExecutionError::Failed);
2303
2304 if let Ok(ref result) = llm_result {
2305 if let Some(usage) = result.usage.as_ref() {
2306 if let Ok(mut totals) = self.token_totals.lock() {
2307 totals.add_usage(usage);
2308 }
2309 if let Ok(mut usage_map) = self.node_usage.lock() {
2310 usage_map.insert(node_id_for_metrics.clone(), usage.clone());
2311 }
2312 }
2313 if let Ok(mut model_map) = self.node_models.lock() {
2314 model_map.insert(node_id_for_metrics, resolved_model);
2315 }
2316 }
2317
2318 return llm_result.map(|result| result.payload);
2319 }
2320
2321 let worker = self
2322 .custom_worker
2323 .ok_or_else(|| ToolExecutionError::NotFound {
2324 tool: input.tool.clone(),
2325 })?;
2326
2327 let payload = input.input.clone();
2328 let email_text = context
2329 .get("input")
2330 .and_then(|v| v.get("email_text"))
2331 .and_then(Value::as_str)
2332 .unwrap_or_default();
2333
2334 let tracer = workflow_tracer();
2335 let mut handler_span_context: Option<TraceContext> = None;
2336 let mut handler_span = if self.trace_sampled {
2337 let (span_context, mut span) = tracer.start_span(
2338 "handler.invoke",
2339 SpanKind::Node,
2340 self.trace_context.as_ref(),
2341 );
2342 handler_span_context = Some(span_context);
2343 apply_trace_identity_attributes(span.as_mut(), self.trace_id.as_deref());
2344 span.set_attribute("handler_name", input.tool.as_str());
2345 apply_trace_tenant_attributes_from_tenant(span.as_mut(), &self.tenant_context);
2346 span.set_attribute(
2347 "node_input",
2348 payload_for_span(self.payload_mode, &payload).as_str(),
2349 );
2350 Some(span)
2351 } else {
2352 None
2353 };
2354
2355 let trace_options = YamlWorkflowRunOptions {
2356 telemetry: YamlWorkflowTelemetryConfig::default(),
2357 trace: YamlWorkflowTraceOptions {
2358 context: self.trace_input_context.clone(),
2359 tenant: self.tenant_context.clone(),
2360 },
2361 model: None,
2362 };
2363 let worker_trace_context = merged_trace_context_for_worker(
2364 handler_span_context.as_ref(),
2365 self.trace_id.as_deref(),
2366 &trace_options,
2367 );
2368 let worker_context = custom_worker_context_with_trace(
2369 &context,
2370 &worker_trace_context,
2371 &self.tenant_context,
2372 );
2373
2374 let output_result = worker
2375 .execute(&input.tool, &payload, email_text, &worker_context)
2376 .await
2377 .map_err(ToolExecutionError::Failed);
2378
2379 if let Some(span) = handler_span.as_mut() {
2380 if output_result.is_ok() {
2381 span.add_event("handler.success");
2382 } else {
2383 span.add_event("handler.error");
2384 }
2385 }
2386
2387 if let Some(span) = handler_span.take() {
2388 span.end();
2389 }
2390
2391 output_result
2392 }
2393 }
2394
2395 let tool_executor = YamlIrToolExecutor {
2396 llm_executor: executor,
2397 custom_worker,
2398 token_totals: std::sync::Mutex::new(YamlTokenTotals::default()),
2399 node_usage: std::sync::Mutex::new(BTreeMap::new()),
2400 node_models: std::sync::Mutex::new(BTreeMap::new()),
2401 model_override: options.model.clone(),
2402 trace_id: telemetry_context.trace_id.clone(),
2403 trace_context: workflow_span_context.clone(),
2404 trace_input_context: options.trace.context.clone(),
2405 tenant_context: options.trace.tenant.clone(),
2406 payload_mode: options.telemetry.payload_mode,
2407 trace_sampled: telemetry_context.sampled,
2408 };
2409
2410 let runtime_options = WorkflowRuntimeOptions {
2411 validate_before_run: false,
2412 ..WorkflowRuntimeOptions::default()
2413 };
2414 let runtime = WorkflowRuntime::new(ir, &NoopLlm, Some(&tool_executor), runtime_options);
2415
2416 let started = Instant::now();
2417 let result = match runtime.execute(workflow_input.clone(), None).await {
2418 Ok(result) => result,
2419 Err(WorkflowRuntimeError::Validation(_)) => return Ok(None),
2420 Err(error) => {
2421 return Err(YamlWorkflowRunError::IrRuntime {
2422 message: error.to_string(),
2423 });
2424 }
2425 };
2426 let total_elapsed_ms = started.elapsed().as_millis();
2427
2428 let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
2429 for (node_id, output) in result.node_outputs {
2430 if node_id == YAML_START_NODE_ID {
2431 continue;
2432 }
2433 outputs.insert(node_id, json!({"output": output}));
2434 }
2435
2436 let mut trace = Vec::new();
2437 let mut step_timings = Vec::new();
2438 let node_usage_map = tool_executor
2439 .node_usage
2440 .lock()
2441 .map(|usage| usage.clone())
2442 .unwrap_or_default();
2443 let llm_node_models = tool_executor
2444 .node_models
2445 .lock()
2446 .map(|models| models.clone())
2447 .unwrap_or_default();
2448 let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
2449 for execution in result.node_executions {
2450 if execution.node_id == YAML_START_NODE_ID {
2451 continue;
2452 }
2453 let node_id = execution.node_id;
2454 trace.push(node_id.clone());
2455 let usage = node_usage_map.get(&node_id);
2456 if let Some(usage) = usage {
2457 llm_node_metrics.insert(
2458 node_id.clone(),
2459 YamlLlmNodeMetrics {
2460 elapsed_ms: 0,
2461 prompt_tokens: usage.prompt_tokens,
2462 completion_tokens: usage.completion_tokens,
2463 total_tokens: usage.total_tokens,
2464 reasoning_tokens: usage.reasoning_tokens,
2465 tokens_per_second: completion_tokens_per_second(usage.completion_tokens, 0),
2466 },
2467 );
2468 }
2469 step_timings.push(YamlStepTiming {
2470 node_id: node_id.clone(),
2471 node_kind: "ir_runtime".to_string(),
2472 model_name: llm_node_models.get(&node_id).cloned(),
2473 elapsed_ms: 0,
2474 prompt_tokens: usage.map(|value| value.prompt_tokens),
2475 completion_tokens: usage.map(|value| value.completion_tokens),
2476 total_tokens: usage.map(|value| value.total_tokens),
2477 reasoning_tokens: usage.and_then(|value| value.reasoning_tokens),
2478 tokens_per_second: usage
2479 .map(|value| completion_tokens_per_second(value.completion_tokens, 0)),
2480 });
2481 }
2482
2483 let terminal_node = result.terminal_node_id;
2484 let terminal_output = outputs
2485 .get(&terminal_node)
2486 .and_then(|v| v.get("output"))
2487 .cloned();
2488
2489 let email_text = workflow_input
2490 .get("email_text")
2491 .and_then(Value::as_str)
2492 .unwrap_or_default()
2493 .to_string();
2494
2495 let token_totals = tool_executor
2496 .token_totals
2497 .lock()
2498 .map(|totals| totals.clone())
2499 .unwrap_or_default();
2500
2501 let output = YamlWorkflowRunOutput {
2502 workflow_id: workflow.id.clone(),
2503 entry_node: workflow.entry_node.clone(),
2504 email_text,
2505 trace,
2506 outputs,
2507 terminal_node,
2508 terminal_output,
2509 step_timings,
2510 llm_node_metrics,
2511 llm_node_models,
2512 total_elapsed_ms,
2513 ttft_ms: None,
2514 total_input_tokens: token_totals.input_tokens,
2515 total_output_tokens: token_totals.output_tokens,
2516 total_tokens: token_totals.total_tokens,
2517 total_reasoning_tokens: token_totals.reasoning_tokens,
2518 tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
2519 trace_id: telemetry_context.trace_id.clone(),
2520 metadata: telemetry_context.trace_id.as_ref().map(|value| {
2521 workflow_metadata_with_trace(
2522 options,
2523 value,
2524 telemetry_context.sampled,
2525 telemetry_context.trace_id_source,
2526 )
2527 }),
2528 };
2529
2530 if let Some(mut span) = workflow_span.take() {
2531 span.set_attribute("workflow_id", workflow.id.as_str());
2532 apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
2533 apply_langfuse_trace_input_output_attributes(
2534 span.as_mut(),
2535 workflow_input,
2536 &output,
2537 options.telemetry.payload_mode,
2538 );
2539 apply_langfuse_nerdstats_attributes(span.as_mut(), &output, options.telemetry.nerdstats);
2540 span.end();
2541 flush_workflow_tracer();
2542 }
2543
2544 Ok(Some(output))
2545}
2546
2547async fn execute_subworkflow_tool_call(
2548 payload: &Value,
2549 context: &Value,
2550 client: &SimpleAgentsClient,
2551 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2552 parent_options: &YamlWorkflowRunOptions,
2553 parent_trace_context: Option<&TraceContext>,
2554 resolved_trace_id: Option<&str>,
2555) -> Result<Value, String> {
2556 let workflow_id = payload
2557 .get("workflow_id")
2558 .and_then(Value::as_str)
2559 .ok_or_else(|| "run_workflow_graph requires payload.workflow_id".to_string())?;
2560
2561 let input_context = context
2562 .get("input")
2563 .and_then(Value::as_object)
2564 .ok_or_else(|| "run_workflow_graph requires context.input".to_string())?;
2565
2566 let registry = input_context
2567 .get("workflow_registry")
2568 .and_then(Value::as_object)
2569 .ok_or_else(|| {
2570 "run_workflow_graph requires input.workflow_registry map of workflow_id -> yaml_path"
2571 .to_string()
2572 })?;
2573
2574 let workflow_path = registry
2575 .get(workflow_id)
2576 .and_then(Value::as_str)
2577 .ok_or_else(|| {
2578 format!(
2579 "workflow_registry has no entry for workflow_id '{}'",
2580 workflow_id
2581 )
2582 })?;
2583
2584 let parent_depth = input_context
2585 .get("__subgraph_depth")
2586 .and_then(Value::as_u64)
2587 .unwrap_or(0);
2588 let max_depth = input_context
2589 .get("__subgraph_max_depth")
2590 .and_then(Value::as_u64)
2591 .unwrap_or(3);
2592
2593 if parent_depth >= max_depth {
2594 return Err(format!(
2595 "run_workflow_graph depth limit reached (depth={}, max={})",
2596 parent_depth, max_depth
2597 ));
2598 }
2599
2600 let mut subworkflow_input = payload
2601 .get("input")
2602 .and_then(Value::as_object)
2603 .cloned()
2604 .unwrap_or_default();
2605
2606 if !subworkflow_input.contains_key("messages") {
2607 if let Some(messages) = input_context.get("messages") {
2608 subworkflow_input.insert("messages".to_string(), messages.clone());
2609 }
2610 }
2611
2612 if !subworkflow_input.contains_key("email_text") {
2613 if let Some(email_text) = input_context.get("email_text") {
2614 subworkflow_input.insert("email_text".to_string(), email_text.clone());
2615 }
2616 }
2617
2618 if !subworkflow_input.contains_key("workflow_registry") {
2619 subworkflow_input.insert(
2620 "workflow_registry".to_string(),
2621 Value::Object(registry.clone()),
2622 );
2623 }
2624
2625 subworkflow_input.insert(
2626 "__subgraph_depth".to_string(),
2627 Value::Number(serde_json::Number::from(parent_depth + 1)),
2628 );
2629 subworkflow_input.insert(
2630 "__subgraph_max_depth".to_string(),
2631 Value::Number(serde_json::Number::from(max_depth)),
2632 );
2633
2634 let subworkflow_options =
2635 build_subworkflow_options(parent_options, parent_trace_context, resolved_trace_id);
2636
2637 let output = run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
2638 Path::new(workflow_path),
2639 &Value::Object(subworkflow_input),
2640 client,
2641 custom_worker,
2642 None,
2643 &subworkflow_options,
2644 )
2645 .await
2646 .map_err(|error| format!("subworkflow '{}' failed: {}", workflow_id, error))?;
2647
2648 Ok(json!({
2649 "workflow_id": workflow_id,
2650 "workflow_path": workflow_path,
2651 "terminal_node": output.terminal_node,
2652 "terminal_output": output.terminal_output,
2653 "trace": output.trace,
2654 }))
2655}
2656
2657fn build_subworkflow_options(
2658 parent_options: &YamlWorkflowRunOptions,
2659 parent_trace_context: Option<&TraceContext>,
2660 resolved_trace_id: Option<&str>,
2661) -> YamlWorkflowRunOptions {
2662 let mut subworkflow_options = parent_options.clone();
2663 if parent_trace_context.is_some() || resolved_trace_id.is_some() {
2664 let trace_context = YamlWorkflowTraceContextInput {
2665 trace_id: resolved_trace_id
2666 .map(|value| value.to_string())
2667 .or_else(|| parent_trace_context.and_then(|ctx| ctx.trace_id.clone())),
2668 span_id: parent_trace_context.and_then(|ctx| ctx.span_id.clone()),
2669 parent_span_id: parent_trace_context.and_then(|ctx| ctx.parent_span_id.clone()),
2670 traceparent: parent_trace_context.and_then(|ctx| ctx.traceparent.clone()),
2671 tracestate: parent_trace_context.and_then(|ctx| ctx.tracestate.clone()),
2672 baggage: parent_trace_context
2673 .map(|ctx| ctx.baggage.clone())
2674 .unwrap_or_default(),
2675 };
2676 subworkflow_options.trace.context = Some(trace_context);
2677 }
2678 subworkflow_options
2679}
2680
2681#[derive(Debug, Clone, Deserialize)]
2682pub struct YamlWorkflow {
2683 pub id: String,
2684 pub entry_node: String,
2685 #[serde(default)]
2686 pub nodes: Vec<YamlNode>,
2687 #[serde(default)]
2688 pub edges: Vec<YamlEdge>,
2689}
2690
2691#[derive(Debug, Clone, Deserialize)]
2692pub struct YamlNode {
2693 pub id: String,
2694 pub node_type: YamlNodeType,
2695 pub config: Option<YamlNodeConfig>,
2696}
2697
2698impl YamlNode {
2699 fn kind_name(&self) -> &'static str {
2700 if self.node_type.llm_call.is_some() {
2701 "llm_call"
2702 } else if self.node_type.switch.is_some() {
2703 "switch"
2704 } else if self.node_type.custom_worker.is_some() {
2705 "custom_worker"
2706 } else {
2707 "unknown"
2708 }
2709 }
2710}
2711
2712#[derive(Debug, Clone, Deserialize)]
2713pub struct YamlNodeType {
2714 pub llm_call: Option<YamlLlmCall>,
2715 pub switch: Option<YamlSwitch>,
2716 pub custom_worker: Option<YamlCustomWorker>,
2717}
2718
2719#[derive(Debug, Clone, Deserialize)]
2720pub struct YamlLlmCall {
2721 pub model: String,
2722 pub stream: Option<bool>,
2723 pub stream_json_as_text: Option<bool>,
2724 pub heal: Option<bool>,
2725 pub messages_path: Option<String>,
2726 pub append_prompt_as_user: Option<bool>,
2727 #[serde(default)]
2728 pub tools_format: YamlToolFormat,
2729 #[serde(default)]
2730 pub tools: Vec<YamlToolDeclaration>,
2731 pub tool_choice: Option<YamlToolChoiceConfig>,
2732 pub max_tool_roundtrips: Option<u8>,
2733 pub tool_calls_global_key: Option<String>,
2734}
2735
2736#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
2737#[serde(rename_all = "snake_case")]
2738pub enum YamlToolFormat {
2739 #[default]
2740 Openai,
2741 Simplified,
2742}
2743
2744#[derive(Debug, Clone, Deserialize)]
2745#[serde(untagged)]
2746pub enum YamlToolDeclaration {
2747 OpenAi(YamlOpenAiToolDeclaration),
2748 Simplified(YamlSimplifiedToolDeclaration),
2749}
2750
2751#[derive(Debug, Clone, Deserialize)]
2752pub struct YamlOpenAiToolDeclaration {
2753 #[serde(rename = "type")]
2754 pub tool_type: Option<ToolType>,
2755 pub function: YamlOpenAiToolFunction,
2756}
2757
2758#[derive(Debug, Clone, Deserialize)]
2759pub struct YamlOpenAiToolFunction {
2760 pub name: String,
2761 pub description: Option<String>,
2762 pub parameters: Option<Value>,
2763 pub output_schema: Option<Value>,
2764}
2765
2766#[derive(Debug, Clone, Deserialize)]
2767pub struct YamlSimplifiedToolDeclaration {
2768 pub name: String,
2769 pub description: Option<String>,
2770 pub input_schema: Value,
2771 pub output_schema: Option<Value>,
2772}
2773
2774#[derive(Debug, Clone, Deserialize)]
2775#[serde(untagged)]
2776pub enum YamlToolChoiceConfig {
2777 Mode(ToolChoiceMode),
2778 Function { function: String },
2779 OpenAi(ToolChoiceTool),
2780}
2781
2782#[derive(Debug, Clone, Deserialize)]
2783pub struct YamlSwitch {
2784 #[serde(default)]
2785 pub branches: Vec<YamlSwitchBranch>,
2786 pub default: String,
2787}
2788
2789#[derive(Debug, Clone, Deserialize)]
2790pub struct YamlSwitchBranch {
2791 pub condition: String,
2792 pub target: String,
2793}
2794
2795#[derive(Debug, Clone, Deserialize)]
2796pub struct YamlCustomWorker {
2797 pub handler: String,
2798}
2799
2800#[derive(Debug, Clone, Deserialize)]
2801pub struct YamlNodeConfig {
2802 pub prompt: Option<String>,
2803 #[serde(default, alias = "schema")]
2804 pub output_schema: Option<Value>,
2805 pub payload: Option<Value>,
2806 pub set_globals: Option<HashMap<String, String>>,
2807 pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
2808}
2809
2810#[derive(Debug, Clone, Deserialize)]
2811pub struct YamlGlobalUpdate {
2812 pub op: String,
2813 pub from: Option<String>,
2814 pub by: Option<f64>,
2815}
2816
2817#[derive(Debug, Clone, Deserialize)]
2818pub struct YamlEdge {
2819 pub from: String,
2820 pub to: String,
2821}
2822
2823#[cfg(test)]
2824mod tests {
2825 use super::*;
2826 use simple_agent_type::provider::{Provider, ProviderRequest, ProviderResponse};
2827 use simple_agent_type::response::{CompletionChoice, CompletionResponse, Usage};
2828 use simple_agent_type::tool::{ToolCallFunction, ToolType};
2829 use simple_agent_type::{Result as SaResult, SimpleAgentsError};
2830 use simple_agents_core::SimpleAgentsClientBuilder;
2831 use std::collections::BTreeMap;
2832 use std::fs;
2833 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2834 use std::sync::{Arc, Mutex, OnceLock};
2835
2836 fn stream_debug_env_lock() -> &'static Mutex<()> {
2837 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
2838 LOCK.get_or_init(|| Mutex::new(()))
2839 }
2840
2841 struct MockExecutor;
2842
2843 struct RecordingSink {
2844 events: Mutex<Vec<YamlWorkflowEvent>>,
2845 }
2846
2847 struct CancelAfterFirstEventSink {
2848 cancelled: AtomicBool,
2849 }
2850
2851 impl YamlWorkflowEventSink for CancelAfterFirstEventSink {
2852 fn emit(&self, _event: &YamlWorkflowEvent) {
2853 self.cancelled.store(true, Ordering::SeqCst);
2854 }
2855
2856 fn is_cancelled(&self) -> bool {
2857 self.cancelled.load(Ordering::SeqCst)
2858 }
2859 }
2860
2861 struct CountingExecutor {
2862 call_count: AtomicUsize,
2863 }
2864
2865 struct CapturingWorker {
2866 context: Mutex<Option<Value>>,
2867 }
2868
2869 struct ToolLoopProvider;
2870
2871 struct UnknownToolProvider;
2872
2873 struct ReasoningUsageProvider;
2874
2875 struct ToolLoopReasoningProvider;
2876
2877 #[derive(Default)]
2878 struct CapturingSpan {
2879 attributes: BTreeMap<String, String>,
2880 }
2881
2882 impl WorkflowSpan for CapturingSpan {
2883 fn set_attribute(&mut self, key: &str, value: &str) {
2884 self.attributes.insert(key.to_string(), value.to_string());
2885 }
2886
2887 fn add_event(&mut self, _name: &str) {}
2888
2889 fn end(self: Box<Self>) {}
2890 }
2891
2892 #[async_trait]
2893 impl Provider for ToolLoopProvider {
2894 fn name(&self) -> &str {
2895 "openai"
2896 }
2897
2898 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
2899 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
2900 Ok(ProviderRequest::new("mock://tool-loop").with_body(body))
2901 }
2902
2903 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
2904 let request: CompletionRequest =
2905 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
2906
2907 let has_tools = request
2908 .tools
2909 .as_ref()
2910 .is_some_and(|tools| !tools.is_empty());
2911 let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
2912
2913 let response = if has_tools && !has_tool_result {
2914 CompletionResponse {
2915 id: "resp_tool_1".to_string(),
2916 model: request.model.clone(),
2917 choices: vec![CompletionChoice {
2918 index: 0,
2919 message: Message::assistant("").with_tool_calls(vec![ToolCall {
2920 id: "call_get_context".to_string(),
2921 tool_type: ToolType::Function,
2922 function: ToolCallFunction {
2923 name: "get_customer_context".to_string(),
2924 arguments: "{\"order_id\":\"123\"}".to_string(),
2925 },
2926 }]),
2927 finish_reason: FinishReason::ToolCalls,
2928 logprobs: None,
2929 }],
2930 usage: Usage::new(10, 5),
2931 created: None,
2932 provider: Some(self.name().to_string()),
2933 healing_metadata: None,
2934 }
2935 } else if has_tools && has_tool_result {
2936 CompletionResponse {
2937 id: "resp_tool_2".to_string(),
2938 model: request.model.clone(),
2939 choices: vec![CompletionChoice {
2940 index: 0,
2941 message: Message::assistant("{\"state\":\"done\"}"),
2942 finish_reason: FinishReason::Stop,
2943 logprobs: None,
2944 }],
2945 usage: Usage::new(12, 6),
2946 created: None,
2947 provider: Some(self.name().to_string()),
2948 healing_metadata: None,
2949 }
2950 } else {
2951 let prompt = request
2952 .messages
2953 .iter()
2954 .rev()
2955 .find(|m| m.role == Role::User)
2956 .map(|m| m.content.clone())
2957 .unwrap_or_default();
2958 let payload = json!({
2959 "subject": "ok",
2960 "body": prompt,
2961 })
2962 .to_string();
2963 CompletionResponse {
2964 id: "resp_final".to_string(),
2965 model: request.model.clone(),
2966 choices: vec![CompletionChoice {
2967 index: 0,
2968 message: Message::assistant(payload),
2969 finish_reason: FinishReason::Stop,
2970 logprobs: None,
2971 }],
2972 usage: Usage::new(8, 4),
2973 created: None,
2974 provider: Some(self.name().to_string()),
2975 healing_metadata: None,
2976 }
2977 };
2978
2979 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
2980 Ok(ProviderResponse::new(200, body))
2981 }
2982
2983 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
2984 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
2985 }
2986 }
2987
2988 #[async_trait]
2989 impl Provider for UnknownToolProvider {
2990 fn name(&self) -> &str {
2991 "openai"
2992 }
2993
2994 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
2995 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
2996 Ok(ProviderRequest::new("mock://unknown-tool").with_body(body))
2997 }
2998
2999 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
3000 let request: CompletionRequest =
3001 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
3002
3003 let response = CompletionResponse {
3004 id: "resp_unknown_tool".to_string(),
3005 model: request.model,
3006 choices: vec![CompletionChoice {
3007 index: 0,
3008 message: Message::assistant("").with_tool_calls(vec![ToolCall {
3009 id: "call_unknown".to_string(),
3010 tool_type: ToolType::Function,
3011 function: ToolCallFunction {
3012 name: "unknown_tool".to_string(),
3013 arguments: "{}".to_string(),
3014 },
3015 }]),
3016 finish_reason: FinishReason::ToolCalls,
3017 logprobs: None,
3018 }],
3019 usage: Usage::new(5, 2),
3020 created: None,
3021 provider: Some(self.name().to_string()),
3022 healing_metadata: None,
3023 };
3024
3025 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
3026 Ok(ProviderResponse::new(200, body))
3027 }
3028
3029 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
3030 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
3031 }
3032 }
3033
3034 #[async_trait]
3035 impl Provider for ReasoningUsageProvider {
3036 fn name(&self) -> &str {
3037 "openai"
3038 }
3039
3040 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
3041 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
3042 Ok(ProviderRequest::new("mock://reasoning-usage").with_body(body))
3043 }
3044
3045 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
3046 let request: CompletionRequest =
3047 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
3048
3049 let mut usage = Usage::new(9, 5);
3050 usage.reasoning_tokens = Some(4);
3051 let response = CompletionResponse {
3052 id: "resp_reasoning".to_string(),
3053 model: request.model,
3054 choices: vec![CompletionChoice {
3055 index: 0,
3056 message: Message::assistant("{\"state\":\"ok\"}"),
3057 finish_reason: FinishReason::Stop,
3058 logprobs: None,
3059 }],
3060 usage,
3061 created: None,
3062 provider: Some(self.name().to_string()),
3063 healing_metadata: None,
3064 };
3065
3066 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
3067 Ok(ProviderResponse::new(200, body))
3068 }
3069
3070 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
3071 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
3072 }
3073 }
3074
3075 #[async_trait]
3076 impl Provider for ToolLoopReasoningProvider {
3077 fn name(&self) -> &str {
3078 "openai"
3079 }
3080
3081 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
3082 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
3083 Ok(ProviderRequest::new("mock://tool-loop-reasoning").with_body(body))
3084 }
3085
3086 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
3087 let request: CompletionRequest =
3088 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
3089
3090 let has_tools = request
3091 .tools
3092 .as_ref()
3093 .is_some_and(|tools| !tools.is_empty());
3094 let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
3095
3096 let response = if has_tools && !has_tool_result {
3097 let mut usage = Usage::new(10, 5);
3098 usage.reasoning_tokens = Some(2);
3099 CompletionResponse {
3100 id: "resp_tool_reasoning_1".to_string(),
3101 model: request.model.clone(),
3102 choices: vec![CompletionChoice {
3103 index: 0,
3104 message: Message::assistant("").with_tool_calls(vec![ToolCall {
3105 id: "call_get_context".to_string(),
3106 tool_type: ToolType::Function,
3107 function: ToolCallFunction {
3108 name: "get_customer_context".to_string(),
3109 arguments: "{\"order_id\":\"123\"}".to_string(),
3110 },
3111 }]),
3112 finish_reason: FinishReason::ToolCalls,
3113 logprobs: None,
3114 }],
3115 usage,
3116 created: None,
3117 provider: Some(self.name().to_string()),
3118 healing_metadata: None,
3119 }
3120 } else {
3121 let mut usage = Usage::new(12, 6);
3122 usage.reasoning_tokens = Some(3);
3123 CompletionResponse {
3124 id: "resp_tool_reasoning_2".to_string(),
3125 model: request.model,
3126 choices: vec![CompletionChoice {
3127 index: 0,
3128 message: Message::assistant("{\"state\":\"done\"}"),
3129 finish_reason: FinishReason::Stop,
3130 logprobs: None,
3131 }],
3132 usage,
3133 created: None,
3134 provider: Some(self.name().to_string()),
3135 healing_metadata: None,
3136 }
3137 };
3138
3139 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
3140 Ok(ProviderResponse::new(200, body))
3141 }
3142
3143 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
3144 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
3145 }
3146 }
3147
3148 struct FixedToolWorker {
3149 payload: Value,
3150 }
3151
3152 struct CountingToolWorker {
3153 execute_calls: AtomicUsize,
3154 }
3155
3156 #[async_trait]
3157 impl YamlWorkflowCustomWorkerExecutor for FixedToolWorker {
3158 async fn execute(
3159 &self,
3160 _handler: &str,
3161 _payload: &Value,
3162 _email_text: &str,
3163 _context: &Value,
3164 ) -> Result<Value, String> {
3165 Ok(self.payload.clone())
3166 }
3167 }
3168
3169 #[async_trait]
3170 impl YamlWorkflowCustomWorkerExecutor for CountingToolWorker {
3171 async fn execute(
3172 &self,
3173 _handler: &str,
3174 _payload: &Value,
3175 _email_text: &str,
3176 _context: &Value,
3177 ) -> Result<Value, String> {
3178 self.execute_calls.fetch_add(1, Ordering::SeqCst);
3179 Ok(json!({"ok": true}))
3180 }
3181 }
3182
3183 #[async_trait]
3184 impl YamlWorkflowLlmExecutor for CountingExecutor {
3185 async fn complete_structured(
3186 &self,
3187 _request: YamlLlmExecutionRequest,
3188 _event_sink: Option<&dyn YamlWorkflowEventSink>,
3189 ) -> Result<YamlLlmExecutionResult, String> {
3190 self.call_count.fetch_add(1, Ordering::SeqCst);
3191 Ok(YamlLlmExecutionResult {
3192 payload: json!({"state":"ok"}),
3193 usage: None,
3194 ttft_ms: None,
3195 tool_calls: Vec::new(),
3196 })
3197 }
3198 }
3199
3200 impl YamlWorkflowEventSink for RecordingSink {
3201 fn emit(&self, event: &YamlWorkflowEvent) {
3202 self.events
3203 .lock()
3204 .expect("recording sink lock should not be poisoned")
3205 .push(event.clone());
3206 }
3207 }
3208
3209 #[async_trait]
3210 impl YamlWorkflowCustomWorkerExecutor for CapturingWorker {
3211 async fn execute(
3212 &self,
3213 _handler: &str,
3214 _payload: &Value,
3215 _email_text: &str,
3216 context: &Value,
3217 ) -> Result<Value, String> {
3218 let mut guard = self
3219 .context
3220 .lock()
3221 .map_err(|_| "capturing worker lock should not be poisoned".to_string())?;
3222 *guard = Some(context.clone());
3223 Ok(json!({"ok": true}))
3224 }
3225 }
3226
3227 #[async_trait]
3228 impl YamlWorkflowLlmExecutor for MockExecutor {
3229 async fn complete_structured(
3230 &self,
3231 request: YamlLlmExecutionRequest,
3232 _event_sink: Option<&dyn YamlWorkflowEventSink>,
3233 ) -> Result<YamlLlmExecutionResult, String> {
3234 let prompt = request.prompt;
3235 if prompt.contains("exactly one category") {
3236 return Ok(YamlLlmExecutionResult {
3237 payload: json!({"category":"termination","reason":"mock"}),
3238 usage: Some(YamlLlmTokenUsage {
3239 prompt_tokens: 10,
3240 completion_tokens: 5,
3241 total_tokens: 15,
3242 reasoning_tokens: None,
3243 }),
3244 ttft_ms: None,
3245 tool_calls: Vec::new(),
3246 });
3247 }
3248 if prompt.contains("Determine termination subtype") {
3249 return Ok(YamlLlmExecutionResult {
3250 payload: json!({"subtype":"repeated_offense","reason":"mock"}),
3251 usage: Some(YamlLlmTokenUsage {
3252 prompt_tokens: 12,
3253 completion_tokens: 6,
3254 total_tokens: 18,
3255 reasoning_tokens: None,
3256 }),
3257 ttft_ms: None,
3258 tool_calls: Vec::new(),
3259 });
3260 }
3261 if prompt.contains("Determine supply chain subtype") {
3262 return Ok(YamlLlmExecutionResult {
3263 payload: json!({"subtype":"order_replacement","reason":"mock"}),
3264 usage: Some(YamlLlmTokenUsage {
3265 prompt_tokens: 11,
3266 completion_tokens: 4,
3267 total_tokens: 15,
3268 reasoning_tokens: None,
3269 }),
3270 ttft_ms: None,
3271 tool_calls: Vec::new(),
3272 });
3273 }
3274 Err("unexpected prompt".to_string())
3275 }
3276 }
3277
3278 #[tokio::test]
3279 async fn runs_yaml_workflow_and_returns_step_timings() {
3280 let yaml = r#"
3281id: email-intake-classification
3282entry_node: classify_top_level
3283nodes:
3284 - id: classify_top_level
3285 node_type:
3286 llm_call:
3287 model: gpt-4.1
3288 config:
3289 prompt: |
3290 Classify this email into exactly one category:
3291 {{ input.email_text }}
3292 - id: route_top_level
3293 node_type:
3294 switch:
3295 branches:
3296 - condition: '$.nodes.classify_top_level.output.category == "termination"'
3297 target: classify_termination_subtype
3298 default: rag_clarification
3299 - id: classify_termination_subtype
3300 node_type:
3301 llm_call:
3302 model: gpt-4.1
3303 config:
3304 prompt: |
3305 Determine termination subtype:
3306 {{ input.email_text }}
3307 - id: route_termination_subtype
3308 node_type:
3309 switch:
3310 branches:
3311 - condition: '$.nodes.classify_termination_subtype.output.subtype == "repeated_offense"'
3312 target: rag_termination_repeated_offense
3313 default: rag_clarification
3314 - id: rag_termination_repeated_offense
3315 node_type:
3316 custom_worker:
3317 handler: GetRagData
3318 config:
3319 payload:
3320 topic: termination_repeated_offense
3321 - id: rag_clarification
3322 node_type:
3323 custom_worker:
3324 handler: GetRagData
3325 config:
3326 payload:
3327 topic: clarification
3328edges:
3329 - from: classify_top_level
3330 to: route_top_level
3331 - from: classify_termination_subtype
3332 to: route_termination_subtype
3333"#;
3334
3335 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3336 let custom_worker = FixedToolWorker {
3337 payload: json!({"context": "mock"}),
3338 };
3339 let output = run_email_workflow_yaml_with_custom_worker(
3340 &workflow,
3341 "test",
3342 &MockExecutor,
3343 Some(&custom_worker),
3344 )
3345 .await
3346 .expect("yaml workflow should execute");
3347
3348 assert_eq!(output.workflow_id, "email-intake-classification");
3349 assert_eq!(output.terminal_node, "rag_termination_repeated_offense");
3350 assert!(!output.step_timings.is_empty());
3351 assert_eq!(output.step_timings.len(), output.trace.len());
3352 assert!(output
3353 .outputs
3354 .contains_key("rag_termination_repeated_offense"));
3355 assert_eq!(output.total_input_tokens, 22);
3356 assert_eq!(output.total_output_tokens, 11);
3357 assert_eq!(output.total_tokens, 33);
3358 }
3359
3360 #[tokio::test]
3361 async fn workflow_runner_executes_inline_workflow_with_executor() {
3362 let yaml = r#"
3363id: email-intake-classification
3364entry_node: classify_top_level
3365nodes:
3366 - id: classify_top_level
3367 node_type:
3368 llm_call:
3369 model: gpt-4.1
3370 config:
3371 prompt: |
3372 Classify this email into exactly one category:
3373 {{ input.email_text }}
3374"#;
3375
3376 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3377 let output = WorkflowRunner::from_workflow(&workflow)
3378 .with_email_text("test")
3379 .with_executor(&MockExecutor)
3380 .run()
3381 .await
3382 .expect("builder execution should succeed");
3383
3384 assert_eq!(output.workflow_id, "email-intake-classification");
3385 assert!(output.outputs.contains_key("classify_top_level"));
3386 }
3387
3388 #[tokio::test]
3389 async fn workflow_runner_requires_executor() {
3390 let yaml = r#"
3391id: missing-executor
3392entry_node: end
3393nodes:
3394 - id: end
3395 node_type:
3396 end: {}
3397"#;
3398 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3399
3400 let err = WorkflowRunner::from_workflow(&workflow)
3401 .with_input(&json!({"email_text":"x"}))
3402 .run()
3403 .await
3404 .expect_err("missing executor should fail");
3405
3406 assert!(matches!(
3407 err,
3408 YamlWorkflowRunError::InvalidInput { message }
3409 if message.contains("workflow executor is required")
3410 ));
3411 }
3412
3413 #[tokio::test]
3414 async fn emits_resolved_llm_input_event_with_bindings() {
3415 let yaml = r#"
3416id: email-intake-classification
3417entry_node: classify_top_level
3418nodes:
3419 - id: classify_top_level
3420 node_type:
3421 llm_call:
3422 model: gpt-4.1
3423 config:
3424 prompt: |
3425 Classify this email into exactly one category:
3426 {{ input.email_text }}
3427"#;
3428
3429 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3430 let sink = RecordingSink {
3431 events: Mutex::new(Vec::new()),
3432 };
3433
3434 let output = run_email_workflow_yaml_with_custom_worker_and_events(
3435 &workflow,
3436 "Need help with termination",
3437 &MockExecutor,
3438 None,
3439 Some(&sink),
3440 )
3441 .await
3442 .expect("yaml workflow should execute");
3443
3444 assert_eq!(output.terminal_node, "classify_top_level");
3445
3446 let events = sink
3447 .events
3448 .lock()
3449 .expect("recording sink lock should not be poisoned");
3450 let llm_event = events
3451 .iter()
3452 .find(|event| event.event_type == "node_llm_input_resolved")
3453 .expect("expected llm input telemetry event");
3454
3455 let metadata = llm_event
3456 .metadata
3457 .as_ref()
3458 .expect("llm input event must include metadata");
3459 assert_eq!(metadata["model"], Value::String("gpt-4.1".to_string()));
3460 assert_eq!(metadata["stream_requested"], Value::Bool(false));
3461 assert_eq!(metadata["heal_requested"], Value::Bool(false));
3462 assert!(metadata["prompt"]
3463 .as_str()
3464 .expect("prompt should be a string")
3465 .contains("Need help with termination"));
3466
3467 let bindings = metadata["bindings"]
3468 .as_array()
3469 .expect("bindings should be an array");
3470 assert_eq!(bindings.len(), 1);
3471 assert_eq!(
3472 bindings[0]["source_path"],
3473 Value::String("input.email_text".to_string())
3474 );
3475 assert_eq!(
3476 bindings[0]["resolved"],
3477 Value::String("Need help with termination".to_string())
3478 );
3479 assert_eq!(bindings[0]["missing"], Value::Bool(false));
3480 assert_eq!(
3481 bindings[0]["resolved_type"],
3482 Value::String("string".to_string())
3483 );
3484 }
3485
3486 #[tokio::test]
3487 async fn workflow_completed_event_includes_nerdstats_by_default() {
3488 let yaml = r#"
3489id: nerdstats-default
3490entry_node: classify
3491nodes:
3492 - id: classify
3493 node_type:
3494 llm_call:
3495 model: gpt-4.1
3496 config:
3497 prompt: |
3498 Classify this email into exactly one category:
3499 {{ input.email_text }}
3500"#;
3501
3502 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3503 let sink = RecordingSink {
3504 events: Mutex::new(Vec::new()),
3505 };
3506
3507 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
3508 &workflow,
3509 &json!({"email_text":"hello"}),
3510 &MockExecutor,
3511 None,
3512 Some(&sink),
3513 &YamlWorkflowRunOptions::default(),
3514 )
3515 .await
3516 .expect("workflow should execute");
3517
3518 let events = sink
3519 .events
3520 .lock()
3521 .expect("recording sink lock should not be poisoned");
3522 let completed = events
3523 .iter()
3524 .find(|event| event.event_type == "workflow_completed")
3525 .expect("expected workflow_completed event");
3526 let metadata = completed
3527 .metadata
3528 .as_ref()
3529 .expect("workflow_completed should include metadata by default");
3530 let nerdstats = metadata
3531 .get("nerdstats")
3532 .expect("nerdstats should be present by default");
3533
3534 assert_eq!(nerdstats["workflow_id"], Value::String(output.workflow_id));
3535 assert_eq!(
3536 nerdstats["terminal_node"],
3537 Value::String(output.terminal_node)
3538 );
3539 assert_eq!(
3540 nerdstats["total_tokens"],
3541 Value::Number(output.total_tokens.into())
3542 );
3543 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
3544 assert_eq!(
3545 nerdstats["token_metrics_source"],
3546 Value::String("provider_usage".to_string())
3547 );
3548 assert!(nerdstats.get("step_timings").is_none());
3549 assert!(nerdstats.get("llm_node_metrics").is_none());
3550 assert!(nerdstats.get("step_details").is_some());
3551 assert!(nerdstats.get("llm_node_models").is_none());
3552 assert_eq!(
3553 nerdstats["step_details"][0]["model_name"],
3554 Value::String("gpt-4.1".to_string())
3555 );
3556 assert_eq!(
3557 nerdstats["step_details"][0]["node_id"],
3558 Value::String("classify".to_string())
3559 );
3560 assert_eq!(nerdstats["ttft_ms"], Value::Null);
3561 }
3562
3563 #[tokio::test]
3564 async fn workflow_completed_event_omits_nerdstats_when_disabled() {
3565 let yaml = r#"
3566id: nerdstats-disabled
3567entry_node: classify
3568nodes:
3569 - id: classify
3570 node_type:
3571 llm_call:
3572 model: gpt-4.1
3573 config:
3574 prompt: |
3575 Classify this email into exactly one category:
3576 {{ input.email_text }}
3577"#;
3578
3579 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3580 let sink = RecordingSink {
3581 events: Mutex::new(Vec::new()),
3582 };
3583 let options = YamlWorkflowRunOptions {
3584 telemetry: YamlWorkflowTelemetryConfig {
3585 nerdstats: false,
3586 ..YamlWorkflowTelemetryConfig::default()
3587 },
3588 ..YamlWorkflowRunOptions::default()
3589 };
3590
3591 run_workflow_yaml_with_custom_worker_and_events_and_options(
3592 &workflow,
3593 &json!({"email_text":"hello"}),
3594 &MockExecutor,
3595 None,
3596 Some(&sink),
3597 &options,
3598 )
3599 .await
3600 .expect("workflow should execute");
3601
3602 let events = sink
3603 .events
3604 .lock()
3605 .expect("recording sink lock should not be poisoned");
3606 let completed = events
3607 .iter()
3608 .find(|event| event.event_type == "workflow_completed")
3609 .expect("expected workflow_completed event");
3610 assert!(completed.metadata.is_none());
3611 }
3612
3613 struct StreamAwareMockExecutor;
3614
3615 #[async_trait]
3616 impl YamlWorkflowLlmExecutor for StreamAwareMockExecutor {
3617 async fn complete_structured(
3618 &self,
3619 request: YamlLlmExecutionRequest,
3620 _event_sink: Option<&dyn YamlWorkflowEventSink>,
3621 ) -> Result<YamlLlmExecutionResult, String> {
3622 Ok(YamlLlmExecutionResult {
3623 payload: json!({"state":"ok"}),
3624 usage: Some(YamlLlmTokenUsage {
3625 prompt_tokens: 20,
3626 completion_tokens: 10,
3627 total_tokens: 30,
3628 reasoning_tokens: None,
3629 }),
3630 ttft_ms: if request.stream { Some(12) } else { None },
3631 tool_calls: Vec::new(),
3632 })
3633 }
3634 }
3635
3636 #[tokio::test]
3637 async fn workflow_completed_event_includes_nerdstats_for_streaming_nodes() {
3638 let yaml = r#"
3639id: nerdstats-streaming
3640entry_node: classify
3641nodes:
3642 - id: classify
3643 node_type:
3644 llm_call:
3645 model: gpt-4.1
3646 stream: true
3647 config:
3648 prompt: |
3649 Return JSON only:
3650 {"state":"ok"}
3651"#;
3652
3653 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3654 let sink = RecordingSink {
3655 events: Mutex::new(Vec::new()),
3656 };
3657
3658 run_workflow_yaml_with_custom_worker_and_events_and_options(
3659 &workflow,
3660 &json!({"email_text":"hello"}),
3661 &StreamAwareMockExecutor,
3662 None,
3663 Some(&sink),
3664 &YamlWorkflowRunOptions::default(),
3665 )
3666 .await
3667 .expect("workflow should execute");
3668
3669 let events = sink
3670 .events
3671 .lock()
3672 .expect("recording sink lock should not be poisoned");
3673 let completed = events
3674 .iter()
3675 .find(|event| event.event_type == "workflow_completed")
3676 .expect("expected workflow_completed event");
3677 let metadata = completed
3678 .metadata
3679 .as_ref()
3680 .expect("workflow_completed should include metadata by default");
3681 let nerdstats = metadata
3682 .get("nerdstats")
3683 .expect("nerdstats should be present by default");
3684
3685 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
3686 assert_eq!(nerdstats["total_tokens"], Value::Number(30u64.into()));
3687 assert_eq!(nerdstats["ttft_ms"], Value::Number(12u64.into()));
3688 assert!(nerdstats.get("step_timings").is_none());
3689 assert!(nerdstats.get("llm_node_metrics").is_none());
3690 assert_eq!(
3691 nerdstats["step_details"][0]["model_name"],
3692 Value::String("gpt-4.1".to_string())
3693 );
3694 assert_eq!(
3695 nerdstats["step_details"][0]["node_id"],
3696 Value::String("classify".to_string())
3697 );
3698 }
3699
3700 #[test]
3701 fn workflow_nerdstats_marks_stream_token_metrics_unavailable() {
3702 let output = YamlWorkflowRunOutput {
3703 workflow_id: "workflow".to_string(),
3704 entry_node: "start".to_string(),
3705 email_text: "hello".to_string(),
3706 trace: vec!["llm_node".to_string()],
3707 outputs: BTreeMap::new(),
3708 terminal_node: "llm_node".to_string(),
3709 terminal_output: None,
3710 step_timings: vec![YamlStepTiming {
3711 node_id: "llm_node".to_string(),
3712 node_kind: "llm_call".to_string(),
3713 model_name: Some("gpt-4.1".to_string()),
3714 elapsed_ms: 100,
3715 prompt_tokens: None,
3716 completion_tokens: None,
3717 total_tokens: None,
3718 reasoning_tokens: None,
3719 tokens_per_second: None,
3720 }],
3721 llm_node_metrics: BTreeMap::new(),
3722 llm_node_models: BTreeMap::new(),
3723 total_elapsed_ms: 100,
3724 ttft_ms: None,
3725 total_input_tokens: 0,
3726 total_output_tokens: 0,
3727 total_tokens: 0,
3728 total_reasoning_tokens: None,
3729 tokens_per_second: 0.0,
3730 trace_id: Some("trace-1".to_string()),
3731 metadata: None,
3732 };
3733
3734 let nerdstats = workflow_nerdstats(&output);
3735 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(false));
3736 assert_eq!(
3737 nerdstats["token_metrics_source"],
3738 Value::String("provider_stream_usage_unavailable".to_string())
3739 );
3740 assert_eq!(nerdstats["total_tokens"], Value::Null);
3741 assert_eq!(nerdstats["ttft_ms"], Value::Null);
3742 assert!(nerdstats.get("step_timings").is_none());
3743 assert!(nerdstats.get("llm_node_metrics").is_none());
3744 assert_eq!(
3745 nerdstats["step_details"][0]["node_id"],
3746 Value::String("llm_node".to_string())
3747 );
3748 assert_eq!(
3749 nerdstats["step_details"][0]["model_name"],
3750 Value::String("gpt-4.1".to_string())
3751 );
3752 }
3753
3754 #[test]
3755 fn workflow_nerdstats_includes_ttft_when_available() {
3756 let output = YamlWorkflowRunOutput {
3757 workflow_id: "workflow".to_string(),
3758 entry_node: "start".to_string(),
3759 email_text: "hello".to_string(),
3760 trace: vec!["llm_node".to_string()],
3761 outputs: BTreeMap::new(),
3762 terminal_node: "llm_node".to_string(),
3763 terminal_output: None,
3764 step_timings: vec![YamlStepTiming {
3765 node_id: "llm_node".to_string(),
3766 node_kind: "llm_call".to_string(),
3767 model_name: Some("gpt-4.1".to_string()),
3768 elapsed_ms: 100,
3769 prompt_tokens: Some(10),
3770 completion_tokens: Some(15),
3771 total_tokens: Some(25),
3772 reasoning_tokens: None,
3773 tokens_per_second: Some(150.0),
3774 }],
3775 llm_node_metrics: BTreeMap::new(),
3776 llm_node_models: BTreeMap::new(),
3777 total_elapsed_ms: 100,
3778 ttft_ms: Some(42),
3779 total_input_tokens: 10,
3780 total_output_tokens: 15,
3781 total_tokens: 25,
3782 total_reasoning_tokens: None,
3783 tokens_per_second: 150.0,
3784 trace_id: Some("trace-2".to_string()),
3785 metadata: None,
3786 };
3787
3788 let nerdstats = workflow_nerdstats(&output);
3789 assert_eq!(nerdstats["ttft_ms"], Value::Number(42u64.into()));
3790 assert!(nerdstats.get("step_timings").is_none());
3791 assert!(nerdstats.get("llm_node_metrics").is_none());
3792 assert_eq!(
3793 nerdstats["step_details"][0]["node_id"],
3794 Value::String("llm_node".to_string())
3795 );
3796 assert_eq!(
3797 nerdstats["step_details"][0]["model_name"],
3798 Value::String("gpt-4.1".to_string())
3799 );
3800 }
3801
3802 #[test]
3803 fn workflow_nerdstats_schema_contract_is_stable() {
3804 let output = YamlWorkflowRunOutput {
3805 workflow_id: "schema-workflow".to_string(),
3806 entry_node: "start".to_string(),
3807 email_text: "hello".to_string(),
3808 trace: vec!["classify".to_string(), "route".to_string()],
3809 outputs: BTreeMap::new(),
3810 terminal_node: "route".to_string(),
3811 terminal_output: None,
3812 step_timings: vec![
3813 YamlStepTiming {
3814 node_id: "classify".to_string(),
3815 node_kind: "llm_call".to_string(),
3816 model_name: Some("gpt-4.1".to_string()),
3817 elapsed_ms: 100,
3818 prompt_tokens: Some(11),
3819 completion_tokens: Some(22),
3820 total_tokens: Some(33),
3821 reasoning_tokens: Some(7),
3822 tokens_per_second: Some(220.0),
3823 },
3824 YamlStepTiming {
3825 node_id: "route".to_string(),
3826 node_kind: "switch".to_string(),
3827 model_name: None,
3828 elapsed_ms: 0,
3829 prompt_tokens: None,
3830 completion_tokens: None,
3831 total_tokens: None,
3832 reasoning_tokens: None,
3833 tokens_per_second: None,
3834 },
3835 ],
3836 llm_node_metrics: BTreeMap::new(),
3837 llm_node_models: BTreeMap::new(),
3838 total_elapsed_ms: 100,
3839 ttft_ms: Some(9),
3840 total_input_tokens: 11,
3841 total_output_tokens: 22,
3842 total_tokens: 33,
3843 total_reasoning_tokens: Some(7),
3844 tokens_per_second: 220.0,
3845 trace_id: Some("trace-schema".to_string()),
3846 metadata: None,
3847 };
3848
3849 let nerdstats = workflow_nerdstats(&output);
3850 let expected = json!({
3851 "workflow_id": "schema-workflow",
3852 "terminal_node": "route",
3853 "total_elapsed_ms": 100,
3854 "ttft_ms": 9,
3855 "step_details": [
3856 {
3857 "node_id": "classify",
3858 "node_kind": "llm_call",
3859 "model_name": "gpt-4.1",
3860 "elapsed_ms": 100,
3861 "prompt_tokens": 11,
3862 "completion_tokens": 22,
3863 "total_tokens": 33,
3864 "reasoning_tokens": 7,
3865 "tokens_per_second": 220.0
3866 },
3867 {
3868 "node_id": "route",
3869 "node_kind": "switch",
3870 "elapsed_ms": 0
3871 }
3872 ],
3873 "total_input_tokens": 11,
3874 "total_output_tokens": 22,
3875 "total_tokens": 33,
3876 "total_reasoning_tokens": 7,
3877 "tokens_per_second": 220.0,
3878 "trace_id": "trace-schema",
3879 "token_metrics_available": true,
3880 "token_metrics_source": "provider_usage",
3881 "llm_nodes_without_usage": []
3882 });
3883
3884 assert_eq!(nerdstats, expected);
3885 }
3886
3887 struct MessageHistoryExecutor;
3888
3889 #[async_trait]
3890 impl YamlWorkflowLlmExecutor for MessageHistoryExecutor {
3891 async fn complete_structured(
3892 &self,
3893 request: YamlLlmExecutionRequest,
3894 _event_sink: Option<&dyn YamlWorkflowEventSink>,
3895 ) -> Result<YamlLlmExecutionResult, String> {
3896 let messages = request
3897 .messages
3898 .ok_or_else(|| "expected messages in request".to_string())?;
3899 if messages.len() != 2 {
3900 return Err(format!("expected 2 messages, got {}", messages.len()));
3901 }
3902 Ok(YamlLlmExecutionResult {
3903 payload: json!({"category":"termination","reason":"history"}),
3904 usage: Some(YamlLlmTokenUsage {
3905 prompt_tokens: 7,
3906 completion_tokens: 3,
3907 total_tokens: 10,
3908 reasoning_tokens: None,
3909 }),
3910 ttft_ms: None,
3911 tool_calls: Vec::new(),
3912 })
3913 }
3914 }
3915
3916 #[tokio::test]
3917 async fn supports_messages_path_in_workflow_input() {
3918 let yaml = r#"
3919id: email-intake-classification
3920entry_node: classify_top_level
3921nodes:
3922 - id: classify_top_level
3923 node_type:
3924 llm_call:
3925 model: gpt-4.1
3926 messages_path: input.messages
3927 append_prompt_as_user: false
3928"#;
3929
3930 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3931 let input = json!({
3932 "email_text": "ignored",
3933 "messages": [
3934 {"role": "system", "content": "You are a classifier"},
3935 {"role": "user", "content": "Please classify this"}
3936 ]
3937 });
3938
3939 let output = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
3940 .await
3941 .expect("workflow should use chat history from input");
3942
3943 assert_eq!(output.terminal_node, "classify_top_level");
3944 assert_eq!(
3945 output.outputs["classify_top_level"]["output"]["reason"],
3946 Value::String("history".to_string())
3947 );
3948 }
3949
3950 #[tokio::test]
3951 async fn wrapper_entrypoints_produce_equivalent_outputs() {
3952 let yaml = r#"
3953id: wrapper-equivalence
3954entry_node: classify
3955nodes:
3956 - id: classify
3957 node_type:
3958 llm_call:
3959 model: gpt-4.1
3960 config:
3961 prompt: |
3962 Classify this email into exactly one category:
3963 {{ input.email_text }}
3964"#;
3965
3966 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3967 let input = json!({"email_text":"hello"});
3968
3969 let a = run_workflow_yaml(&workflow, &input, &MockExecutor)
3970 .await
3971 .expect("base entrypoint should execute");
3972 let b = run_workflow_yaml_with_custom_worker(&workflow, &input, &MockExecutor, None)
3973 .await
3974 .expect("custom worker wrapper should execute");
3975 let c = run_workflow_yaml_with_custom_worker_and_events_and_options(
3976 &workflow,
3977 &input,
3978 &MockExecutor,
3979 None,
3980 None,
3981 &YamlWorkflowRunOptions::default(),
3982 )
3983 .await
3984 .expect("events/options wrapper should execute");
3985
3986 assert_eq!(a.workflow_id, b.workflow_id);
3987 assert_eq!(a.workflow_id, c.workflow_id);
3988 assert_eq!(a.terminal_node, b.terminal_node);
3989 assert_eq!(a.terminal_node, c.terminal_node);
3990 assert_eq!(a.outputs, b.outputs);
3991 assert_eq!(a.outputs, c.outputs);
3992 assert_eq!(a.total_tokens, b.total_tokens);
3993 assert_eq!(a.total_tokens, c.total_tokens);
3994 }
3995
3996 #[tokio::test]
3997 async fn yaml_llm_tool_calling_captures_traces_and_supports_globals_reference() {
3998 let yaml = r#"
3999id: tool-calling-workflow
4000entry_node: generate_with_tool
4001nodes:
4002 - id: generate_with_tool
4003 node_type:
4004 llm_call:
4005 model: gpt-4.1
4006 tools_format: simplified
4007 max_tool_roundtrips: 1
4008 tool_calls_global_key: audit
4009 tools:
4010 - name: get_customer_context
4011 description: Fetch customer context
4012 input_schema:
4013 type: object
4014 properties:
4015 order_id: { type: string }
4016 required: [order_id]
4017 additionalProperties: false
4018 output_schema:
4019 type: object
4020 properties:
4021 customer_name: { type: string }
4022 required: [customer_name]
4023 additionalProperties: false
4024 config:
4025 output_schema:
4026 type: object
4027 properties:
4028 state: { type: string }
4029 required: [state]
4030 - id: personalize
4031 node_type:
4032 llm_call:
4033 model: gpt-4.1
4034 config:
4035 prompt: |
4036 Write an email greeting for {{ globals.audit.0.output.customer_name }}.
4037 output_schema:
4038 type: object
4039 properties:
4040 subject: { type: string }
4041 body: { type: string }
4042 required: [subject, body]
4043edges:
4044 - from: generate_with_tool
4045 to: personalize
4046"#;
4047
4048 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4049 let client = SimpleAgentsClientBuilder::new()
4050 .with_provider(Arc::new(ToolLoopProvider))
4051 .build()
4052 .expect("client should build");
4053 let worker = FixedToolWorker {
4054 payload: json!({"customer_name": "Ava"}),
4055 };
4056
4057 let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4058 &workflow,
4059 &json!({"email_text":"hello"}),
4060 &client,
4061 Some(&worker),
4062 None,
4063 &YamlWorkflowRunOptions::default(),
4064 )
4065 .await
4066 .expect("workflow should execute");
4067
4068 assert_eq!(output.trace, vec!["generate_with_tool", "personalize"]);
4069 assert_eq!(
4070 output.outputs["generate_with_tool"]["tool_calls"][0]["output"]["customer_name"],
4071 Value::String("Ava".to_string())
4072 );
4073 let body = output.outputs["personalize"]["output"]["body"]
4074 .as_str()
4075 .expect("body should be string");
4076 assert!(body.contains("Ava"));
4077 }
4078
4079 #[tokio::test]
4080 async fn workflow_with_client_preserves_reasoning_tokens_in_output_and_nerdstats() {
4081 let yaml = r#"
4082id: reasoning-usage-workflow
4083entry_node: classify
4084nodes:
4085 - id: classify
4086 node_type:
4087 llm_call:
4088 model: gpt-4.1
4089 config:
4090 prompt: |
4091 Return JSON only:
4092 {"state":"ok"}
4093 output_schema:
4094 type: object
4095 properties:
4096 state: { type: string }
4097 required: [state]
4098"#;
4099
4100 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4101 let client = SimpleAgentsClientBuilder::new()
4102 .with_provider(Arc::new(ReasoningUsageProvider))
4103 .build()
4104 .expect("client should build");
4105
4106 let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4107 &workflow,
4108 &json!({"email_text":"hello"}),
4109 &client,
4110 None,
4111 None,
4112 &YamlWorkflowRunOptions::default(),
4113 )
4114 .await
4115 .expect("workflow should execute");
4116
4117 assert_eq!(output.total_reasoning_tokens, Some(4));
4118 assert_eq!(output.step_timings.len(), 1);
4119 assert_eq!(output.step_timings[0].reasoning_tokens, Some(4));
4120
4121 let nerdstats = workflow_nerdstats(&output);
4122 assert_eq!(
4123 nerdstats["total_reasoning_tokens"],
4124 Value::Number(4u64.into())
4125 );
4126 assert_eq!(
4127 nerdstats["step_details"][0]["reasoning_tokens"],
4128 Value::Number(4u64.into())
4129 );
4130 }
4131
4132 #[tokio::test]
4133 async fn workflow_with_tools_accumulates_reasoning_tokens_across_roundtrips() {
4134 let yaml = r#"
4135id: tool-reasoning-workflow
4136entry_node: generate_with_tool
4137nodes:
4138 - id: generate_with_tool
4139 node_type:
4140 llm_call:
4141 model: gpt-4.1
4142 tools_format: simplified
4143 max_tool_roundtrips: 1
4144 tools:
4145 - name: get_customer_context
4146 input_schema:
4147 type: object
4148 properties:
4149 order_id: { type: string }
4150 required: [order_id]
4151 additionalProperties: false
4152 output_schema:
4153 type: object
4154 properties:
4155 customer_name: { type: string }
4156 required: [customer_name]
4157 additionalProperties: false
4158 config:
4159 output_schema:
4160 type: object
4161 properties:
4162 state: { type: string }
4163 required: [state]
4164"#;
4165
4166 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4167 let client = SimpleAgentsClientBuilder::new()
4168 .with_provider(Arc::new(ToolLoopReasoningProvider))
4169 .build()
4170 .expect("client should build");
4171 let worker = FixedToolWorker {
4172 payload: json!({"customer_name": "Ava"}),
4173 };
4174
4175 let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4176 &workflow,
4177 &json!({"email_text":"hello"}),
4178 &client,
4179 Some(&worker),
4180 None,
4181 &YamlWorkflowRunOptions::default(),
4182 )
4183 .await
4184 .expect("workflow should execute");
4185
4186 assert_eq!(output.total_reasoning_tokens, Some(5));
4187 assert_eq!(output.step_timings.len(), 1);
4188 assert_eq!(output.step_timings[0].reasoning_tokens, Some(5));
4189
4190 let nerdstats = workflow_nerdstats(&output);
4191 assert_eq!(
4192 nerdstats["total_reasoning_tokens"],
4193 Value::Number(5u64.into())
4194 );
4195 assert_eq!(
4196 nerdstats["step_details"][0]["reasoning_tokens"],
4197 Value::Number(5u64.into())
4198 );
4199 }
4200
4201 #[tokio::test]
4202 async fn yaml_llm_tool_output_schema_mismatch_hard_fails_node() {
4203 let yaml = r#"
4204id: tool-calling-schema-fail
4205entry_node: generate_with_tool
4206nodes:
4207 - id: generate_with_tool
4208 node_type:
4209 llm_call:
4210 model: gpt-4.1
4211 tools_format: simplified
4212 max_tool_roundtrips: 1
4213 tools:
4214 - name: get_customer_context
4215 input_schema:
4216 type: object
4217 properties:
4218 order_id: { type: string }
4219 required: [order_id]
4220 additionalProperties: false
4221 output_schema:
4222 type: object
4223 properties:
4224 customer_name: { type: string }
4225 required: [customer_name]
4226 additionalProperties: false
4227 config:
4228 output_schema:
4229 type: object
4230 properties:
4231 state: { type: string }
4232 required: [state]
4233"#;
4234
4235 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4236 let client = SimpleAgentsClientBuilder::new()
4237 .with_provider(Arc::new(ToolLoopProvider))
4238 .build()
4239 .expect("client should build");
4240 let worker = FixedToolWorker {
4241 payload: json!({"unexpected": "shape"}),
4242 };
4243
4244 let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4245 &workflow,
4246 &json!({"email_text":"hello"}),
4247 &client,
4248 Some(&worker),
4249 None,
4250 &YamlWorkflowRunOptions::default(),
4251 )
4252 .await
4253 .expect_err("workflow should hard-fail on schema mismatch");
4254
4255 match error {
4256 YamlWorkflowRunError::Llm { message, .. } => {
4257 assert!(message.contains("output failed schema validation"));
4258 }
4259 other => panic!("expected llm error, got {other:?}"),
4260 }
4261 }
4262
4263 #[tokio::test]
4264 async fn yaml_llm_unknown_tool_is_rejected_before_custom_worker_execution() {
4265 let yaml = r#"
4266id: unknown-tool-rejected
4267entry_node: generate_with_tool
4268nodes:
4269 - id: generate_with_tool
4270 node_type:
4271 llm_call:
4272 model: gpt-4.1
4273 tools_format: simplified
4274 max_tool_roundtrips: 1
4275 tools:
4276 - name: get_customer_context
4277 input_schema:
4278 type: object
4279 properties:
4280 order_id: { type: string }
4281 required: [order_id]
4282 config:
4283 output_schema:
4284 type: object
4285 properties:
4286 state: { type: string }
4287 required: [state]
4288"#;
4289
4290 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4291 let client = SimpleAgentsClientBuilder::new()
4292 .with_provider(Arc::new(UnknownToolProvider))
4293 .build()
4294 .expect("client should build");
4295 let worker = CountingToolWorker {
4296 execute_calls: AtomicUsize::new(0),
4297 };
4298
4299 let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4300 &workflow,
4301 &json!({"email_text":"hello"}),
4302 &client,
4303 Some(&worker),
4304 None,
4305 &YamlWorkflowRunOptions::default(),
4306 )
4307 .await
4308 .expect_err("workflow should reject unknown tool before executing worker");
4309
4310 match error {
4311 YamlWorkflowRunError::Llm { message, .. } => {
4312 assert!(message.contains("model requested unknown tool 'unknown_tool'"));
4313 }
4314 other => panic!("expected llm error, got {other:?}"),
4315 }
4316
4317 assert_eq!(worker.execute_calls.load(Ordering::SeqCst), 0);
4318 }
4319
4320 #[test]
4321 fn validates_tools_format_mismatch() {
4322 let yaml = r#"
4323id: mismatch
4324entry_node: generate
4325nodes:
4326 - id: generate
4327 node_type:
4328 llm_call:
4329 model: gpt-4.1
4330 tools_format: openai
4331 tools:
4332 - name: get_customer_context
4333 input_schema:
4334 type: object
4335 properties:
4336 order_id: { type: string }
4337 required: [order_id]
4338"#;
4339
4340 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4341 let diagnostics = verify_yaml_workflow(&workflow);
4342 assert!(diagnostics
4343 .iter()
4344 .any(|diagnostic| diagnostic.code == "invalid_tools_format"));
4345 }
4346
4347 #[tokio::test]
4348 async fn custom_worker_node_requires_executor() {
4349 let yaml = r#"
4350id: missing-custom-worker
4351entry_node: enrich
4352nodes:
4353 - id: enrich
4354 node_type:
4355 custom_worker:
4356 handler: GetEmployeeRecord
4357"#;
4358
4359 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4360 let client = SimpleAgentsClientBuilder::new()
4361 .with_provider(Arc::new(ToolLoopProvider))
4362 .build()
4363 .expect("client should build");
4364
4365 let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4366 &workflow,
4367 &json!({"email_text": "hello"}),
4368 &client,
4369 None,
4370 None,
4371 &YamlWorkflowRunOptions::default(),
4372 )
4373 .await
4374 .expect_err("workflow should fail without custom worker executor");
4375
4376 match error {
4377 YamlWorkflowRunError::CustomWorker { node_id, message } => {
4378 assert_eq!(node_id, "enrich");
4379 assert!(message.contains("requires a configured custom worker executor"));
4380 }
4381 other => panic!("expected custom worker error, got {other:?}"),
4382 }
4383 }
4384
4385 #[tokio::test]
4386 async fn custom_worker_receives_trace_context_block() {
4387 let yaml = r#"
4388id: custom-worker-trace-context
4389entry_node: lookup
4390nodes:
4391 - id: lookup
4392 node_type:
4393 custom_worker:
4394 handler: GetRagData
4395 config:
4396 payload:
4397 topic: demo
4398"#;
4399
4400 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4401 let worker = CapturingWorker {
4402 context: Mutex::new(None),
4403 };
4404 let options = YamlWorkflowRunOptions {
4405 trace: YamlWorkflowTraceOptions {
4406 context: Some(YamlWorkflowTraceContextInput {
4407 trace_id: Some("trace-fixed-ctx".to_string()),
4408 traceparent: Some("00-trace-fixed-ctx-span-fixed-01".to_string()),
4409 ..YamlWorkflowTraceContextInput::default()
4410 }),
4411 tenant: YamlWorkflowTraceTenantContext {
4412 conversation_id: Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5".to_string()),
4413 request_id: Some("turn-7".to_string()),
4414 ..YamlWorkflowTraceTenantContext::default()
4415 },
4416 },
4417 ..YamlWorkflowRunOptions::default()
4418 };
4419
4420 run_workflow_yaml_with_custom_worker_and_events_and_options(
4421 &workflow,
4422 &json!({"email_text":"hello"}),
4423 &MockExecutor,
4424 Some(&worker),
4425 None,
4426 &options,
4427 )
4428 .await
4429 .expect("workflow should execute");
4430
4431 let captured_context = worker
4432 .context
4433 .lock()
4434 .expect("capturing worker lock should not be poisoned")
4435 .clone()
4436 .expect("custom worker should receive context");
4437
4438 assert_eq!(
4439 captured_context
4440 .get("trace")
4441 .and_then(|trace| trace.get("context"))
4442 .and_then(|context| context.get("trace_id"))
4443 .and_then(Value::as_str),
4444 Some("trace-fixed-ctx")
4445 );
4446 assert_eq!(
4447 captured_context
4448 .get("trace")
4449 .and_then(|trace| trace.get("context"))
4450 .and_then(|context| context.get("traceparent"))
4451 .and_then(Value::as_str),
4452 Some("00-trace-fixed-ctx-span-fixed-01")
4453 );
4454 assert_eq!(
4455 captured_context
4456 .get("trace")
4457 .and_then(|trace| trace.get("tenant"))
4458 .and_then(|tenant| tenant.get("conversation_id"))
4459 .and_then(Value::as_str),
4460 Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5")
4461 );
4462 }
4463
4464 #[tokio::test]
4465 async fn event_sink_cancellation_stops_workflow_before_llm_execution() {
4466 let yaml = r#"
4467id: cancellation-test
4468entry_node: classify
4469nodes:
4470 - id: classify
4471 node_type:
4472 llm_call:
4473 model: gpt-4.1
4474 config:
4475 prompt: |
4476 Classify this email into exactly one category:
4477 {{ input.email_text }}
4478"#;
4479
4480 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4481 let executor = CountingExecutor {
4482 call_count: AtomicUsize::new(0),
4483 };
4484 let sink = CancelAfterFirstEventSink {
4485 cancelled: AtomicBool::new(false),
4486 };
4487
4488 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
4489 &workflow,
4490 &json!({"email_text":"hello"}),
4491 &executor,
4492 None,
4493 Some(&sink),
4494 &YamlWorkflowRunOptions::default(),
4495 )
4496 .await
4497 .expect_err("workflow should stop when sink signals cancellation");
4498
4499 assert!(matches!(
4500 err,
4501 YamlWorkflowRunError::EventSinkCancelled { .. }
4502 ));
4503 assert_eq!(executor.call_count.load(Ordering::SeqCst), 0);
4504 }
4505
4506 #[tokio::test]
4507 async fn rejects_invalid_messages_path_shape() {
4508 let yaml = r#"
4509id: email-intake-classification
4510entry_node: classify_top_level
4511nodes:
4512 - id: classify_top_level
4513 node_type:
4514 llm_call:
4515 model: gpt-4.1
4516 messages_path: input.messages
4517"#;
4518
4519 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4520 let input = json!({
4521 "email_text": "ignored",
4522 "messages": "not-a-list"
4523 });
4524
4525 let err = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
4526 .await
4527 .expect_err("workflow should fail for invalid messages shape");
4528
4529 assert!(matches!(err, YamlWorkflowRunError::Llm { .. }));
4530 }
4531
4532 #[test]
4533 fn renders_yaml_workflow_to_mermaid_with_switch_labels() {
4534 let yaml = r#"
4535id: chat-workflow
4536entry_node: decide
4537nodes:
4538 - id: decide
4539 node_type:
4540 switch:
4541 branches:
4542 - condition: '$.input.mode == "draft"'
4543 target: draft
4544 default: ask
4545 - id: draft
4546 node_type:
4547 llm_call:
4548 model: gpt-4.1
4549 - id: ask
4550 node_type:
4551 llm_call:
4552 model: gpt-4.1
4553edges:
4554 - from: draft
4555 to: ask
4556"#;
4557
4558 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4559 let mermaid = yaml_workflow_to_mermaid(&workflow);
4560
4561 assert!(mermaid.contains("flowchart TD"));
4562 assert!(mermaid.contains("decide -- \"route1\" --> draft"));
4563 assert!(mermaid.contains("decide -- \"default\" --> ask"));
4564 assert!(mermaid.contains("draft --> ask"));
4565 }
4566
4567 #[test]
4568 fn renders_yaml_workflow_tools_as_colored_tool_nodes() {
4569 let yaml = r#"
4570id: tool-graph
4571entry_node: chat
4572nodes:
4573 - id: chat
4574 node_type:
4575 llm_call:
4576 model: gemini-3-flash
4577 tools_format: simplified
4578 tools:
4579 - name: run_workflow_graph
4580 input_schema:
4581 type: object
4582 properties:
4583 workflow_id: { type: string }
4584 required: [workflow_id]
4585edges: []
4586"#;
4587
4588 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4589 let mermaid = yaml_workflow_to_mermaid(&workflow);
4590
4591 assert!(mermaid.contains("chat__tool_0"));
4592 assert!(mermaid.contains("chat -.-> chat__tool_0"));
4593 assert!(mermaid.contains("classDef toolNode"));
4594 assert!(mermaid.contains("class chat__tool_0 toolNode;"));
4595 }
4596
4597 #[test]
4598 fn renders_yaml_workflow_file_to_mermaid_with_subgraph_cluster_when_present() {
4599 let base_dir = std::env::temp_dir().join(format!(
4600 "simple_agents_mermaid_subgraph_{}",
4601 std::time::SystemTime::now()
4602 .duration_since(std::time::UNIX_EPOCH)
4603 .expect("unix epoch")
4604 .as_nanos()
4605 ));
4606 fs::create_dir_all(&base_dir).expect("temp dir should be created");
4607
4608 let orchestrator_path = base_dir.join("email-chat-orchestrator-with-subgraph-tool.yaml");
4609 let subgraph_path = base_dir.join("hr-warning-email-subgraph.yaml");
4610
4611 let orchestrator_yaml = r#"
4612id: email-chat-orchestrator-with-subgraph-tool
4613entry_node: respond_casual
4614nodes:
4615 - id: respond_casual
4616 node_type:
4617 llm_call:
4618 model: gemini-3-flash
4619 tools_format: simplified
4620 tools:
4621 - name: run_workflow_graph
4622 input_schema:
4623 type: object
4624 properties:
4625 workflow_id: { type: string }
4626 required: [workflow_id]
4627 config:
4628 prompt: |
4629 Call with:
4630 {
4631 "workflow_id": "hr_warning_email_subgraph"
4632 }
4633edges: []
4634"#;
4635
4636 let subgraph_yaml = r#"
4637id: hr-warning-email-subgraph
4638entry_node: draft_hr_warning_email
4639nodes:
4640 - id: draft_hr_warning_email
4641 node_type:
4642 llm_call:
4643 model: gemini-3-flash
4644edges: []
4645"#;
4646
4647 fs::write(&orchestrator_path, orchestrator_yaml).expect("orchestrator yaml written");
4648 fs::write(&subgraph_path, subgraph_yaml).expect("subgraph yaml written");
4649
4650 let mermaid =
4651 yaml_workflow_file_to_mermaid(&orchestrator_path).expect("mermaid should render");
4652
4653 assert!(mermaid.contains("Main: email-chat-orchestrator-with-subgraph-tool"));
4654 assert!(mermaid.contains("Subgraph: hr_warning_email_subgraph"));
4655 assert!(mermaid.contains("calls hr_warning_email_subgraph"));
4656 assert!(mermaid.contains("subgraph_1__draft_hr_warning_email"));
4657
4658 fs::remove_dir_all(base_dir).expect("temp dir removed");
4659 }
4660
4661 #[tokio::test]
4662 async fn rejects_non_yaml_workflow_file_extension() {
4663 let file_path = std::env::temp_dir().join(format!(
4664 "simple_agents_workflow_{}.txt",
4665 std::time::SystemTime::now()
4666 .duration_since(std::time::UNIX_EPOCH)
4667 .expect("unix epoch")
4668 .as_nanos()
4669 ));
4670 fs::write(&file_path, "not yaml").expect("temporary file should be created");
4671
4672 let result =
4673 run_workflow_yaml_file(&file_path, &json!({"email_text": "hello"}), &MockExecutor)
4674 .await;
4675
4676 match result {
4677 Err(YamlWorkflowRunError::FileRejected { path, reason }) => {
4678 assert!(path.ends_with(".txt"));
4679 assert!(reason.contains(".yaml or .yml"));
4680 }
4681 other => panic!("expected file rejection error, got {other:?}"),
4682 }
4683
4684 let _ = fs::remove_file(&file_path);
4685 }
4686
4687 #[test]
4688 fn converts_yaml_workflow_to_ir_definition() {
4689 let yaml = r#"
4690id: chat-workflow
4691entry_node: classify
4692nodes:
4693 - id: classify
4694 node_type:
4695 llm_call:
4696 model: gpt-4.1
4697 config:
4698 prompt: |
4699 classify
4700 - id: route
4701 node_type:
4702 switch:
4703 branches:
4704 - condition: '$.nodes.classify.output.kind == "x"'
4705 target: done
4706 default: done
4707 - id: done
4708 node_type:
4709 custom_worker:
4710 handler: GetRagData
4711 config:
4712 payload:
4713 topic: test
4714edges:
4715 - from: classify
4716 to: route
4717"#;
4718
4719 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4720 let ir = yaml_workflow_to_ir(&workflow).expect("yaml should convert to ir");
4721
4722 assert_eq!(ir.name, "chat-workflow");
4723 assert!(ir.nodes.iter().any(|n| n.id == "__yaml_start"));
4724 assert!(ir.nodes.iter().any(|n| n.id == "classify"));
4725 assert!(ir.nodes.iter().any(|n| n.id == "route"));
4726 assert!(ir.nodes.iter().any(|n| n.id == "done"));
4727 }
4728
4729 #[test]
4730 fn supports_yaml_to_ir_when_messages_path_is_used() {
4731 let yaml = r#"
4732id: chat-workflow
4733entry_node: classify
4734nodes:
4735 - id: classify
4736 node_type:
4737 llm_call:
4738 model: gpt-4.1
4739 messages_path: input.messages
4740"#;
4741
4742 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4743 let ir =
4744 yaml_workflow_to_ir(&workflow).expect("messages_path should convert to tool-based IR");
4745 assert!(ir.nodes.iter().any(|node| matches!(
4746 node.kind,
4747 crate::ir::NodeKind::Tool { ref tool, .. } if tool == "__yaml_llm_call"
4748 )));
4749 }
4750
4751 #[tokio::test]
4752 async fn workflow_output_contains_trace_id_in_both_locations() {
4753 let yaml = r#"
4754id: trace-test
4755entry_node: classify
4756nodes:
4757 - id: classify
4758 node_type:
4759 llm_call:
4760 model: gpt-4.1
4761 config:
4762 prompt: |
4763 Classify this email into exactly one category:
4764 {{ input.email_text }}
4765"#;
4766
4767 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4768 let output = run_workflow_yaml(&workflow, &json!({"email_text":"hello"}), &MockExecutor)
4769 .await
4770 .expect("workflow should execute");
4771
4772 let trace_id = output
4773 .trace_id
4774 .as_deref()
4775 .expect("trace_id should be present");
4776 assert!(!trace_id.is_empty());
4777 assert_eq!(
4778 output.metadata.as_ref().and_then(|value| {
4779 value
4780 .get("telemetry")
4781 .and_then(|telemetry| telemetry.get("trace_id"))
4782 .and_then(Value::as_str)
4783 }),
4784 Some(trace_id)
4785 );
4786 }
4787
4788 #[tokio::test]
4789 async fn workflow_run_options_sample_rate_zero_marks_trace_unsampled() {
4790 let yaml = r#"
4791id: sample-rate-zero
4792entry_node: classify
4793nodes:
4794 - id: classify
4795 node_type:
4796 llm_call:
4797 model: gpt-4.1
4798 config:
4799 prompt: |
4800 Classify this email into exactly one category:
4801 {{ input.email_text }}
4802"#;
4803
4804 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4805 let options = YamlWorkflowRunOptions {
4806 telemetry: YamlWorkflowTelemetryConfig {
4807 sample_rate: 0.0,
4808 ..YamlWorkflowTelemetryConfig::default()
4809 },
4810 trace: YamlWorkflowTraceOptions {
4811 context: Some(YamlWorkflowTraceContextInput {
4812 trace_id: Some("trace-sample-zero".to_string()),
4813 ..YamlWorkflowTraceContextInput::default()
4814 }),
4815 tenant: YamlWorkflowTraceTenantContext::default(),
4816 },
4817 model: None,
4818 };
4819
4820 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
4821 &workflow,
4822 &json!({"email_text":"hello"}),
4823 &MockExecutor,
4824 None,
4825 None,
4826 &options,
4827 )
4828 .await
4829 .expect("workflow should execute");
4830
4831 assert_eq!(output.trace_id.as_deref(), Some("trace-sample-zero"));
4832 assert_eq!(
4833 output
4834 .metadata
4835 .as_ref()
4836 .and_then(|value| value.get("telemetry"))
4837 .and_then(|telemetry| telemetry.get("sampled"))
4838 .and_then(Value::as_bool),
4839 Some(false)
4840 );
4841 }
4842
4843 #[test]
4844 fn trace_id_from_traceparent_parses_w3c_header() {
4845 let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
4846 assert_eq!(
4847 trace_id_from_traceparent(traceparent).as_deref(),
4848 Some("4bf92f3577b34da6a3ce929d0e0e4736")
4849 );
4850 }
4851
4852 #[test]
4853 fn resolve_telemetry_context_marks_traceparent_source() {
4854 let mut options = YamlWorkflowRunOptions::default();
4855 options.trace.context = Some(YamlWorkflowTraceContextInput {
4856 traceparent: Some(
4857 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
4858 ),
4859 ..YamlWorkflowTraceContextInput::default()
4860 });
4861
4862 let context = resolve_telemetry_context(&options, None);
4863
4864 assert_eq!(context.trace_id_source, TraceIdSource::Traceparent);
4865 assert_eq!(
4866 context.trace_id.as_deref(),
4867 Some("4bf92f3577b34da6a3ce929d0e0e4736")
4868 );
4869 }
4870
4871 #[tokio::test]
4872 async fn workflow_run_options_derives_trace_id_from_traceparent_when_trace_id_missing() {
4873 let yaml = r#"
4874id: traceparent-derived
4875entry_node: classify
4876nodes:
4877 - id: classify
4878 node_type:
4879 llm_call:
4880 model: gpt-4.1
4881 config:
4882 prompt: |
4883 Classify this email into exactly one category:
4884 {{ input.email_text }}
4885"#;
4886
4887 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4888 let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
4889 let options = YamlWorkflowRunOptions {
4890 telemetry: YamlWorkflowTelemetryConfig {
4891 sample_rate: 0.0,
4892 ..YamlWorkflowTelemetryConfig::default()
4893 },
4894 trace: YamlWorkflowTraceOptions {
4895 context: Some(YamlWorkflowTraceContextInput {
4896 traceparent: Some(traceparent.to_string()),
4897 ..YamlWorkflowTraceContextInput::default()
4898 }),
4899 tenant: YamlWorkflowTraceTenantContext::default(),
4900 },
4901 model: None,
4902 };
4903
4904 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
4905 &workflow,
4906 &json!({"email_text":"hello"}),
4907 &MockExecutor,
4908 None,
4909 None,
4910 &options,
4911 )
4912 .await
4913 .expect("workflow should execute");
4914
4915 assert_eq!(
4916 output.trace_id.as_deref(),
4917 Some("4bf92f3577b34da6a3ce929d0e0e4736")
4918 );
4919 assert_eq!(
4920 output
4921 .metadata
4922 .as_ref()
4923 .and_then(|value| value.get("telemetry"))
4924 .and_then(|telemetry| telemetry.get("sampled"))
4925 .and_then(Value::as_bool),
4926 Some(false)
4927 );
4928 }
4929
4930 #[tokio::test]
4931 async fn workflow_run_options_sample_rate_one_marks_trace_sampled() {
4932 let yaml = r#"
4933id: sample-rate-one
4934entry_node: classify
4935nodes:
4936 - id: classify
4937 node_type:
4938 llm_call:
4939 model: gpt-4.1
4940 config:
4941 prompt: |
4942 Classify this email into exactly one category:
4943 {{ input.email_text }}
4944"#;
4945
4946 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4947 let options = YamlWorkflowRunOptions {
4948 telemetry: YamlWorkflowTelemetryConfig {
4949 sample_rate: 1.0,
4950 ..YamlWorkflowTelemetryConfig::default()
4951 },
4952 trace: YamlWorkflowTraceOptions {
4953 context: Some(YamlWorkflowTraceContextInput {
4954 trace_id: Some("trace-sample-one".to_string()),
4955 ..YamlWorkflowTraceContextInput::default()
4956 }),
4957 tenant: YamlWorkflowTraceTenantContext::default(),
4958 },
4959 model: None,
4960 };
4961
4962 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
4963 &workflow,
4964 &json!({"email_text":"hello"}),
4965 &MockExecutor,
4966 None,
4967 None,
4968 &options,
4969 )
4970 .await
4971 .expect("workflow should execute");
4972
4973 assert_eq!(output.trace_id.as_deref(), Some("trace-sample-one"));
4974 assert_eq!(
4975 output
4976 .metadata
4977 .as_ref()
4978 .and_then(|value| value.get("telemetry"))
4979 .and_then(|telemetry| telemetry.get("sampled"))
4980 .and_then(Value::as_bool),
4981 Some(true)
4982 );
4983 }
4984
4985 #[tokio::test]
4986 async fn workflow_run_options_sampling_is_deterministic_for_fixed_trace_id() {
4987 let yaml = r#"
4988id: sample-rate-deterministic
4989entry_node: classify
4990nodes:
4991 - id: classify
4992 node_type:
4993 llm_call:
4994 model: gpt-4.1
4995 config:
4996 prompt: |
4997 Classify this email into exactly one category:
4998 {{ input.email_text }}
4999"#;
5000
5001 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5002 let options = YamlWorkflowRunOptions {
5003 telemetry: YamlWorkflowTelemetryConfig {
5004 sample_rate: 0.5,
5005 ..YamlWorkflowTelemetryConfig::default()
5006 },
5007 trace: YamlWorkflowTraceOptions {
5008 context: Some(YamlWorkflowTraceContextInput {
5009 trace_id: Some("trace-sample-deterministic".to_string()),
5010 ..YamlWorkflowTraceContextInput::default()
5011 }),
5012 tenant: YamlWorkflowTraceTenantContext::default(),
5013 },
5014 model: None,
5015 };
5016
5017 let mut sampled_values = Vec::new();
5018 for _ in 0..3 {
5019 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5020 &workflow,
5021 &json!({"email_text":"hello"}),
5022 &MockExecutor,
5023 None,
5024 None,
5025 &options,
5026 )
5027 .await
5028 .expect("workflow should execute");
5029
5030 let sampled = output
5031 .metadata
5032 .as_ref()
5033 .and_then(|value| value.get("telemetry"))
5034 .and_then(|telemetry| telemetry.get("sampled"))
5035 .and_then(Value::as_bool)
5036 .expect("sampled flag should be present");
5037 sampled_values.push(sampled);
5038 }
5039
5040 assert!(sampled_values
5041 .iter()
5042 .all(|value| *value == sampled_values[0]));
5043 }
5044
5045 #[tokio::test]
5046 async fn workflow_run_options_reject_invalid_sample_rate() {
5047 let yaml = r#"
5048id: sample-rate-invalid
5049entry_node: classify
5050nodes:
5051 - id: classify
5052 node_type:
5053 llm_call:
5054 model: gpt-4.1
5055 config:
5056 prompt: |
5057 Classify this email into exactly one category:
5058 {{ input.email_text }}
5059"#;
5060
5061 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5062 let options = YamlWorkflowRunOptions {
5063 telemetry: YamlWorkflowTelemetryConfig {
5064 sample_rate: 1.1,
5065 ..YamlWorkflowTelemetryConfig::default()
5066 },
5067 ..YamlWorkflowRunOptions::default()
5068 };
5069
5070 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
5071 &workflow,
5072 &json!({"email_text":"hello"}),
5073 &MockExecutor,
5074 None,
5075 None,
5076 &options,
5077 )
5078 .await
5079 .expect_err("invalid sample_rate should fail");
5080
5081 match err {
5082 YamlWorkflowRunError::InvalidInput { message } => {
5083 assert!(message.contains("telemetry.sample_rate"));
5084 }
5085 _ => panic!("expected invalid input error for sample_rate"),
5086 }
5087 }
5088
5089 #[tokio::test]
5090 async fn workflow_run_options_reject_nan_sample_rate() {
5091 let yaml = r#"
5092id: sample-rate-invalid
5093entry_node: classify
5094nodes:
5095 - id: classify
5096 node_type:
5097 llm_call:
5098 model: gpt-4.1
5099 config:
5100 prompt: |
5101 Classify this email into exactly one category:
5102 {{ input.email_text }}
5103"#;
5104
5105 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5106 let options = YamlWorkflowRunOptions {
5107 telemetry: YamlWorkflowTelemetryConfig {
5108 sample_rate: f32::NAN,
5109 ..YamlWorkflowTelemetryConfig::default()
5110 },
5111 ..YamlWorkflowRunOptions::default()
5112 };
5113
5114 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
5115 &workflow,
5116 &json!({"email_text":"hello"}),
5117 &MockExecutor,
5118 None,
5119 None,
5120 &options,
5121 )
5122 .await
5123 .expect_err("nan sample_rate should fail");
5124
5125 assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
5126 }
5127
5128 #[test]
5129 fn subworkflow_options_inherit_parent_telemetry_configuration() {
5130 let mut parent_options = YamlWorkflowRunOptions::default();
5131 parent_options.telemetry.sample_rate = 0.0;
5132 parent_options.telemetry.payload_mode = YamlWorkflowPayloadMode::RedactedPayload;
5133 parent_options.telemetry.tool_trace_mode = YamlToolTraceMode::Redacted;
5134 parent_options.trace.tenant.conversation_id = Some("conv-123".to_string());
5135
5136 let parent_context = TraceContext {
5137 trace_id: Some("trace-parent".to_string()),
5138 span_id: Some("span-parent".to_string()),
5139 parent_span_id: None,
5140 traceparent: Some("00-trace-parent-span-parent-01".to_string()),
5141 tracestate: None,
5142 baggage: BTreeMap::new(),
5143 };
5144
5145 let subworkflow_options =
5146 build_subworkflow_options(&parent_options, Some(&parent_context), None);
5147
5148 assert_eq!(subworkflow_options.telemetry.sample_rate, 0.0);
5149 assert_eq!(
5150 subworkflow_options.telemetry.payload_mode,
5151 YamlWorkflowPayloadMode::RedactedPayload
5152 );
5153 assert_eq!(
5154 subworkflow_options.telemetry.tool_trace_mode,
5155 YamlToolTraceMode::Redacted
5156 );
5157 assert_eq!(
5158 subworkflow_options.trace.tenant.conversation_id.as_deref(),
5159 Some("conv-123")
5160 );
5161 assert_eq!(
5162 subworkflow_options
5163 .trace
5164 .context
5165 .as_ref()
5166 .and_then(|ctx| ctx.trace_id.as_deref()),
5167 Some("trace-parent")
5168 );
5169 }
5170
5171 #[test]
5172 fn trace_tenant_attributes_include_langfuse_aliases() {
5173 let tenant = YamlWorkflowTraceTenantContext {
5174 workspace_id: Some("ws-1".to_string()),
5175 user_id: Some("user-1".to_string()),
5176 conversation_id: Some("conv-1".to_string()),
5177 request_id: Some("req-1".to_string()),
5178 run_id: Some("run-1".to_string()),
5179 };
5180 let mut span = CapturingSpan::default();
5181 apply_trace_tenant_attributes_from_tenant(&mut span, &tenant);
5182
5183 assert_eq!(
5184 span.attributes
5185 .get("tenant.workspace_id")
5186 .map(String::as_str),
5187 Some("ws-1")
5188 );
5189 assert_eq!(
5190 span.attributes.get("tenant.user_id").map(String::as_str),
5191 Some("user-1")
5192 );
5193 assert_eq!(
5194 span.attributes
5195 .get("tenant.conversation_id")
5196 .map(String::as_str),
5197 Some("conv-1")
5198 );
5199 assert_eq!(
5200 span.attributes.get("langfuse.user.id").map(String::as_str),
5201 Some("user-1")
5202 );
5203 assert_eq!(
5204 span.attributes
5205 .get("langfuse.session.id")
5206 .map(String::as_str),
5207 Some("conv-1")
5208 );
5209 }
5210
5211 #[test]
5212 fn langfuse_nerdstats_attributes_are_written_when_enabled() {
5213 let output = YamlWorkflowRunOutput {
5214 workflow_id: "wf-1".to_string(),
5215 entry_node: "start".to_string(),
5216 email_text: "email".to_string(),
5217 trace: vec!["node-1".to_string()],
5218 outputs: BTreeMap::new(),
5219 terminal_node: "node-1".to_string(),
5220 terminal_output: None,
5221 step_timings: vec![YamlStepTiming {
5222 node_id: "node-1".to_string(),
5223 node_kind: "llm_call".to_string(),
5224 model_name: Some("gpt-4.1-mini".to_string()),
5225 elapsed_ms: 42,
5226 prompt_tokens: Some(4),
5227 completion_tokens: Some(6),
5228 total_tokens: Some(10),
5229 reasoning_tokens: Some(2),
5230 tokens_per_second: Some(14.0),
5231 }],
5232 llm_node_metrics: BTreeMap::new(),
5233 llm_node_models: BTreeMap::new(),
5234 total_elapsed_ms: 42,
5235 ttft_ms: Some(7),
5236 total_input_tokens: 4,
5237 total_output_tokens: 6,
5238 total_tokens: 10,
5239 total_reasoning_tokens: Some(2),
5240 tokens_per_second: 14.0,
5241 trace_id: Some("trace-1".to_string()),
5242 metadata: None,
5243 };
5244
5245 let mut span = CapturingSpan::default();
5246 apply_langfuse_nerdstats_attributes(&mut span, &output, true);
5247
5248 assert_eq!(
5249 span.attributes
5250 .get("langfuse.trace.metadata.nerdstats.workflow_id")
5251 .map(String::as_str),
5252 Some("wf-1")
5253 );
5254 assert_eq!(
5255 span.attributes
5256 .get("langfuse.trace.metadata.nerdstats.total_tokens")
5257 .map(String::as_str),
5258 Some("10")
5259 );
5260 assert_eq!(
5261 span.attributes
5262 .get("langfuse.trace.metadata.nerdstats.token_metrics_available")
5263 .map(String::as_str),
5264 Some("true")
5265 );
5266 assert!(span
5267 .attributes
5268 .contains_key("langfuse.trace.metadata.nerdstats"));
5269 }
5270
5271 #[test]
5272 fn langfuse_trace_input_output_and_usage_are_written() {
5273 let output = YamlWorkflowRunOutput {
5274 workflow_id: "wf-1".to_string(),
5275 entry_node: "start".to_string(),
5276 email_text: "email".to_string(),
5277 trace: vec!["node-1".to_string()],
5278 outputs: BTreeMap::new(),
5279 terminal_node: "node-1".to_string(),
5280 terminal_output: Some(json!({"final":"ok"})),
5281 step_timings: Vec::new(),
5282 llm_node_metrics: BTreeMap::new(),
5283 llm_node_models: BTreeMap::new(),
5284 total_elapsed_ms: 10,
5285 ttft_ms: None,
5286 total_input_tokens: 11,
5287 total_output_tokens: 22,
5288 total_tokens: 33,
5289 total_reasoning_tokens: Some(4),
5290 tokens_per_second: 1.5,
5291 trace_id: Some("trace-1".to_string()),
5292 metadata: None,
5293 };
5294
5295 let mut span = CapturingSpan::default();
5296 let input = json!({"email_text":"hello"});
5297 apply_langfuse_trace_input_output_attributes(
5298 &mut span,
5299 &input,
5300 &output,
5301 YamlWorkflowPayloadMode::FullPayload,
5302 );
5303
5304 assert_eq!(
5305 span.attributes
5306 .get("langfuse.trace.input")
5307 .map(String::as_str),
5308 Some("{\"email_text\":\"hello\"}")
5309 );
5310 assert_eq!(
5311 span.attributes
5312 .get("langfuse.trace.output")
5313 .map(String::as_str),
5314 Some("{\"final\":\"ok\"}")
5315 );
5316 assert!(span
5317 .attributes
5318 .contains_key("langfuse.trace.metadata.usage_details"));
5319 }
5320
5321 #[test]
5322 fn langfuse_observation_usage_attributes_are_written() {
5323 let usage = YamlLlmTokenUsage {
5324 prompt_tokens: 7,
5325 completion_tokens: 9,
5326 total_tokens: 16,
5327 reasoning_tokens: Some(3),
5328 };
5329 let mut span = CapturingSpan::default();
5330 apply_langfuse_observation_usage_attributes(&mut span, &usage);
5331
5332 assert_eq!(
5333 span.attributes
5334 .get("gen_ai.usage.input_tokens")
5335 .map(String::as_str),
5336 Some("7")
5337 );
5338 assert_eq!(
5339 span.attributes
5340 .get("gen_ai.usage.output_tokens")
5341 .map(String::as_str),
5342 Some("9")
5343 );
5344 assert_eq!(
5345 span.attributes
5346 .get("gen_ai.usage.total_tokens")
5347 .map(String::as_str),
5348 Some("16")
5349 );
5350 assert_eq!(
5351 span.attributes
5352 .get("gen_ai.usage.reasoning_tokens")
5353 .map(String::as_str),
5354 Some("3")
5355 );
5356 assert!(span
5357 .attributes
5358 .contains_key("langfuse.observation.usage_details"));
5359 }
5360
5361 #[tokio::test]
5362 async fn workflow_run_options_use_explicit_trace_id_and_payload_mode() {
5363 let yaml = r#"
5364id: trace-options-test
5365entry_node: classify
5366nodes:
5367 - id: classify
5368 node_type:
5369 llm_call:
5370 model: gpt-4.1
5371 config:
5372 prompt: |
5373 Classify this email into exactly one category:
5374 {{ input.email_text }}
5375"#;
5376
5377 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5378 let options = YamlWorkflowRunOptions {
5379 telemetry: YamlWorkflowTelemetryConfig {
5380 payload_mode: YamlWorkflowPayloadMode::RedactedPayload,
5381 ..YamlWorkflowTelemetryConfig::default()
5382 },
5383 trace: YamlWorkflowTraceOptions {
5384 context: Some(YamlWorkflowTraceContextInput {
5385 trace_id: Some("trace-fixed-123".to_string()),
5386 traceparent: Some("00-trace-fixed-123-span-1-01".to_string()),
5387 ..YamlWorkflowTraceContextInput::default()
5388 }),
5389 tenant: YamlWorkflowTraceTenantContext {
5390 conversation_id: Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42".to_string()),
5391 ..YamlWorkflowTraceTenantContext::default()
5392 },
5393 },
5394 model: None,
5395 };
5396
5397 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5398 &workflow,
5399 &json!({"email_text":"hello"}),
5400 &MockExecutor,
5401 None,
5402 None,
5403 &options,
5404 )
5405 .await
5406 .expect("workflow should execute");
5407
5408 assert_eq!(output.trace_id.as_deref(), Some("trace-fixed-123"));
5409 assert_eq!(
5410 output
5411 .metadata
5412 .as_ref()
5413 .and_then(|value| value.get("telemetry"))
5414 .and_then(|telemetry| telemetry.get("payload_mode"))
5415 .and_then(Value::as_str),
5416 Some("redacted_payload")
5417 );
5418 assert_eq!(
5419 output
5420 .metadata
5421 .as_ref()
5422 .and_then(|value| value.get("trace"))
5423 .and_then(|trace| trace.get("tenant"))
5424 .and_then(|tenant| tenant.get("conversation_id"))
5425 .and_then(Value::as_str),
5426 Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42")
5427 );
5428 }
5429
5430 #[test]
5431 fn streamed_payload_parser_extracts_last_json_object() {
5432 let raw = r#"{"state":"missing_scenario","reason":"ok"}
5433
5434extra reasoning text
5435
5436{"state":"ready","reason":"final"}"#;
5437
5438 let resolved = parse_streamed_structured_payload(raw, false)
5439 .expect("parser should extract final JSON object");
5440 assert_eq!(resolved.payload["state"], "ready");
5441 assert!(resolved.heal_confidence.is_none());
5442 }
5443
5444 #[test]
5445 fn streamed_payload_parser_handles_unbalanced_reasoning_before_json() {
5446 let raw = "reasoning text with unmatched { braces and thoughts\n{\"state\":\"ready\",\"reason\":\"final\"}";
5447
5448 let resolved = parse_streamed_structured_payload(raw, false)
5449 .expect("parser should recover final structured JSON object");
5450 assert_eq!(resolved.payload["state"], "ready");
5451 }
5452
5453 #[test]
5454 fn streamed_payload_parser_handles_markdown_with_heal() {
5455 let raw = r#"Some preface
5456```json
5457{
5458 "state": "missing_scenario",
5459 "reason": "Need more details"
5460}
5461```
5462Some trailing explanation"#;
5463
5464 let resolved = parse_streamed_structured_payload(raw, true)
5465 .expect("heal path should parse JSON block");
5466 assert_eq!(resolved.payload["state"], "missing_scenario");
5467 assert!(resolved.heal_confidence.is_some());
5468 }
5469
5470 #[test]
5471 fn streamed_payload_parser_errors_when_no_json_candidate_exists() {
5472 let raw = "No JSON in this streamed output";
5473 let error = parse_streamed_structured_payload(raw, false)
5474 .expect_err("strict stream parse should fail without JSON candidate");
5475 assert!(error.contains("no JSON object candidate found"));
5476 }
5477
5478 #[test]
5479 fn include_raw_stream_debug_events_defaults_to_false() {
5480 let _guard = stream_debug_env_lock()
5481 .lock()
5482 .expect("stream debug env lock should not be poisoned");
5483 std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
5484 assert!(!include_raw_stream_debug_events());
5485 }
5486
5487 #[test]
5488 fn include_raw_stream_debug_events_accepts_truthy_values() {
5489 let _guard = stream_debug_env_lock()
5490 .lock()
5491 .expect("stream debug env lock should not be poisoned");
5492 std::env::set_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW", "true");
5493 assert!(include_raw_stream_debug_events());
5494 std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
5495 }
5496
5497 #[test]
5498 fn structured_json_delta_filter_strips_reasoning_prefix_and_suffix() {
5499 let mut filter = StructuredJsonDeltaFilter::default();
5500 let chunks = vec![
5501 "I will think first... ",
5502 "{\"state\":\"missing_scenario\",",
5503 "\"reason\":\"Need more details\"}",
5504 " additional commentary",
5505 ];
5506
5507 let filtered = chunks
5508 .into_iter()
5509 .filter_map(|chunk| filter.split(chunk).0)
5510 .collect::<String>();
5511
5512 assert_eq!(
5513 filtered,
5514 "{\"state\":\"missing_scenario\",\"reason\":\"Need more details\"}"
5515 );
5516 }
5517
5518 #[test]
5519 fn structured_json_delta_filter_handles_braces_inside_strings() {
5520 let mut filter = StructuredJsonDeltaFilter::default();
5521 let chunks = vec![
5522 "preface ",
5523 "{\"reason\":\"brace } in text\",\"state\":\"ok\"}",
5524 " trailing",
5525 ];
5526
5527 let filtered = chunks
5528 .into_iter()
5529 .filter_map(|chunk| filter.split(chunk).0)
5530 .collect::<String>();
5531
5532 assert_eq!(
5533 filtered,
5534 "{\"reason\":\"brace } in text\",\"state\":\"ok\"}"
5535 );
5536 }
5537
5538 #[test]
5539 fn render_json_object_as_text_converts_top_level_fields() {
5540 let rendered =
5541 render_json_object_as_text(r#"{"question":"q","confidence":0.8,"nested":{"a":1}}"#);
5542 let lines: std::collections::HashSet<&str> = rendered.lines().collect();
5543
5544 assert_eq!(lines.len(), 3);
5545 assert!(lines.contains("question: q"));
5546 assert!(lines.contains("confidence: 0.8"));
5547 assert!(lines.contains("nested: {\"a\":1}"));
5548 }
5549
5550 #[test]
5551 fn stream_json_as_text_formatter_emits_once_when_complete() {
5552 let mut formatter = StreamJsonAsTextFormatter::default();
5553 formatter.push("{\"question\":\"hello\"}");
5554
5555 let first = formatter.emit_if_ready(true);
5556 let second = formatter.emit_if_ready(true);
5557
5558 assert_eq!(first, Some("question: hello".to_string()));
5559 assert_eq!(second, None);
5560 }
5561
5562 #[test]
5563 fn rewrite_yaml_condition_preserves_output_prefix_in_field_names() {
5564 let expr = "$.nodes.classify.output.output_total == 1";
5565 let rewritten = rewrite_yaml_condition_to_ir(expr);
5566 assert_eq!(rewritten, "$.node_outputs.classify.output_total == 1");
5567 }
5568
5569 #[tokio::test]
5570 async fn validates_workflow_input_before_ir_runtime_path() {
5571 let yaml = r#"
5572id: chat-workflow
5573entry_node: classify
5574nodes:
5575 - id: classify
5576 node_type:
5577 llm_call:
5578 model: gpt-4.1
5579 config:
5580 prompt: |
5581 classify
5582"#;
5583
5584 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5585 let err = run_workflow_yaml(&workflow, &json!("not-an-object"), &MockExecutor)
5586 .await
5587 .expect_err("non-object input should fail before execution");
5588
5589 assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
5590 }
5591
5592 #[test]
5593 fn interpolate_template_supports_dollar_prefixed_paths() {
5594 let context = json!({
5595 "input": {
5596 "email_text": "hello"
5597 }
5598 });
5599
5600 let rendered = interpolate_template("value={{ $.input.email_text }}", &context);
5601 assert_eq!(rendered, "value=hello");
5602 }
5603}