1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::time::Instant;
4
5use async_trait::async_trait;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use simple_agent_type::message::{Message, Role};
10use simple_agent_type::request::CompletionRequest;
11use simple_agent_type::response::FinishReason;
12use simple_agent_type::tool::{
13 ToolCall, ToolChoice, ToolChoiceMode, ToolChoiceTool, ToolDefinition, ToolType,
14};
15use simple_agents_core::{
16 CompletionMode, CompletionOptions, CompletionOutcome, SimpleAgentsClient,
17};
18use simple_agents_healing::JsonishParser;
19use thiserror::Error;
20
21use crate::ir::{Node, NodeKind, RouterRoute, WorkflowDefinition, WORKFLOW_IR_V0};
22use crate::observability::tracing::{
23 flush_workflow_tracer, workflow_tracer, SpanKind, TraceContext, WorkflowSpan,
24};
25use crate::runtime::{
26 LlmExecutionError, LlmExecutionInput, LlmExecutionOutput, LlmExecutor, ToolExecutionError,
27 ToolExecutionInput, ToolExecutor, WorkflowRuntime, WorkflowRuntimeError,
28 WorkflowRuntimeOptions,
29};
30use crate::validation::validate_and_normalize;
31use crate::visualize::workflow_to_mermaid;
32
33mod runner;
34pub use runner::WorkflowRunner;
35mod api;
36mod client_executor;
37mod context;
38mod events;
39mod execute;
40mod globals;
41mod llm_tools;
42mod node_execution;
43mod spans;
44mod typed_contracts;
45mod validation;
46pub use api::{
47 run_email_workflow_yaml, run_email_workflow_yaml_file,
48 run_email_workflow_yaml_file_with_client,
49 run_email_workflow_yaml_file_with_client_and_custom_worker,
50 run_email_workflow_yaml_file_with_client_and_custom_worker_and_events,
51 run_email_workflow_yaml_with_client, run_email_workflow_yaml_with_client_and_custom_worker,
52 run_email_workflow_yaml_with_client_and_custom_worker_and_events,
53 run_email_workflow_yaml_with_custom_worker,
54 run_email_workflow_yaml_with_custom_worker_and_events, run_workflow_yaml,
55 run_workflow_yaml_file, run_workflow_yaml_file_with_client,
56 run_workflow_yaml_file_with_client_and_custom_worker,
57 run_workflow_yaml_file_with_client_and_custom_worker_and_events,
58 run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options,
59 run_workflow_yaml_with_client, run_workflow_yaml_with_client_and_custom_worker,
60 run_workflow_yaml_with_client_and_custom_worker_and_events,
61 run_workflow_yaml_with_custom_worker, run_workflow_yaml_with_custom_worker_and_events,
62};
63use client_executor::BorrowedClientExecutor;
64use context::{
65 build_yaml_context_from_ir_scope, collect_template_bindings, evaluate_switch_condition,
66 interpolate_template, json_type_name, parse_messages_from_context, resolve_path,
67};
68use globals::{apply_set_globals, apply_update_globals};
69use llm_tools::{
70 default_llm_output_schema, llm_output_schema_for_node, normalize_llm_tools,
71 normalize_tool_choice,
72};
73pub use typed_contracts::{
74 YamlWorkflowEventType, YamlWorkflowNodeKind, YamlWorkflowNodeOutputRecord,
75 YamlWorkflowRunTypedOutput, YamlWorkflowTypedEvent,
76};
77pub use validation::verify_yaml_workflow;
78
79const YAML_START_NODE_ID: &str = "__yaml_start";
80const YAML_LLM_TOOL_ID: &str = "__yaml_llm_call";
81const MAX_WORKFLOW_YAML_BYTES: u64 = 1024 * 1024;
82const MAX_WORKFLOW_YAML_DEPTH: usize = 64;
83
84static TRACE_ID_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
85
86#[derive(Debug, Clone, PartialEq, Serialize)]
87pub struct YamlStepTiming {
88 pub node_id: String,
89 pub node_kind: String,
90 #[serde(skip_serializing_if = "Option::is_none")]
91 pub model_name: Option<String>,
92 pub elapsed_ms: u128,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 pub prompt_tokens: Option<u32>,
95 #[serde(skip_serializing_if = "Option::is_none")]
96 pub completion_tokens: Option<u32>,
97 #[serde(skip_serializing_if = "Option::is_none")]
98 pub total_tokens: Option<u32>,
99 #[serde(skip_serializing_if = "Option::is_none")]
100 pub reasoning_tokens: Option<u32>,
101 #[serde(skip_serializing_if = "Option::is_none")]
102 pub tokens_per_second: Option<f64>,
103}
104
105#[derive(Debug, Clone, PartialEq, Serialize)]
106pub struct YamlLlmNodeMetrics {
107 pub elapsed_ms: u128,
108 pub prompt_tokens: u32,
109 pub completion_tokens: u32,
110 pub total_tokens: u32,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 pub reasoning_tokens: Option<u32>,
113 pub tokens_per_second: f64,
114}
115
116#[derive(Debug, Clone, PartialEq, Serialize)]
117pub struct YamlWorkflowRunOutput {
118 pub workflow_id: String,
119 pub entry_node: String,
120 pub email_text: String,
121 pub trace: Vec<String>,
122 pub outputs: BTreeMap<String, Value>,
123 pub terminal_node: String,
124 pub terminal_output: Option<Value>,
125 pub step_timings: Vec<YamlStepTiming>,
126 pub llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics>,
127 pub llm_node_models: BTreeMap<String, String>,
128 pub total_elapsed_ms: u128,
129 #[serde(skip_serializing_if = "Option::is_none")]
130 pub ttft_ms: Option<u128>,
131 pub total_input_tokens: u64,
132 pub total_output_tokens: u64,
133 pub total_tokens: u64,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 pub total_reasoning_tokens: Option<u64>,
136 pub tokens_per_second: f64,
137 #[serde(skip_serializing_if = "Option::is_none")]
138 pub trace_id: Option<String>,
139 #[serde(skip_serializing_if = "Option::is_none")]
140 pub metadata: Option<Value>,
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
144#[serde(rename_all = "snake_case")]
145pub enum YamlWorkflowPayloadMode {
146 #[default]
147 FullPayload,
148 RedactedPayload,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
152#[serde(rename_all = "snake_case")]
153pub enum YamlToolTraceMode {
154 #[default]
155 Full,
156 Redacted,
157 Off,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
161pub struct YamlWorkflowTraceContextInput {
162 #[serde(default)]
163 pub trace_id: Option<String>,
164 #[serde(default)]
165 pub span_id: Option<String>,
166 #[serde(default)]
167 pub parent_span_id: Option<String>,
168 #[serde(default)]
169 pub traceparent: Option<String>,
170 #[serde(default)]
171 pub tracestate: Option<String>,
172 #[serde(default)]
173 pub baggage: BTreeMap<String, String>,
174}
175
176#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
177pub struct YamlWorkflowTraceTenantContext {
178 #[serde(default)]
179 pub workspace_id: Option<String>,
180 #[serde(default)]
181 pub user_id: Option<String>,
182 #[serde(default)]
183 pub conversation_id: Option<String>,
184 #[serde(default)]
185 pub request_id: Option<String>,
186 #[serde(default)]
187 pub run_id: Option<String>,
188}
189
190#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
191pub struct YamlWorkflowTelemetryConfig {
192 #[serde(default = "default_true")]
193 pub enabled: bool,
194 #[serde(default = "default_true")]
195 pub nerdstats: bool,
196 #[serde(default = "default_sample_rate")]
197 pub sample_rate: f32,
198 #[serde(default)]
199 pub payload_mode: YamlWorkflowPayloadMode,
200 #[serde(default = "default_retention_days")]
201 pub retention_days: u32,
202 #[serde(default = "default_true")]
203 pub multi_tenant: bool,
204 #[serde(default)]
205 pub tool_trace_mode: YamlToolTraceMode,
206}
207
208impl Default for YamlWorkflowTelemetryConfig {
209 fn default() -> Self {
210 Self {
211 enabled: true,
212 nerdstats: true,
213 sample_rate: 1.0,
214 payload_mode: YamlWorkflowPayloadMode::FullPayload,
215 retention_days: 30,
216 multi_tenant: true,
217 tool_trace_mode: YamlToolTraceMode::Full,
218 }
219 }
220}
221
222#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
223pub struct YamlWorkflowTraceOptions {
224 #[serde(default)]
225 pub context: Option<YamlWorkflowTraceContextInput>,
226 #[serde(default)]
227 pub tenant: YamlWorkflowTraceTenantContext,
228}
229
230#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
231pub struct YamlWorkflowRunOptions {
232 #[serde(default)]
233 pub telemetry: YamlWorkflowTelemetryConfig,
234 #[serde(default)]
235 pub trace: YamlWorkflowTraceOptions,
236 #[serde(default)]
237 pub model: Option<String>,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
241pub struct YamlLlmTokenUsage {
242 pub prompt_tokens: u32,
243 pub completion_tokens: u32,
244 pub total_tokens: u32,
245 pub reasoning_tokens: Option<u32>,
246}
247
248#[derive(Debug, Clone, PartialEq, Serialize)]
249pub struct YamlLlmExecutionResult {
250 pub payload: Value,
251 pub usage: Option<YamlLlmTokenUsage>,
252 pub ttft_ms: Option<u128>,
253 pub tool_calls: Vec<YamlToolCallTrace>,
254}
255
256#[derive(Debug, Clone, PartialEq, Serialize)]
257pub struct YamlToolCallTrace {
258 pub id: String,
259 pub name: String,
260 pub arguments: Value,
261 pub output: Option<Value>,
262 pub status: String,
263 pub elapsed_ms: u128,
264 pub error: Option<String>,
265}
266
267#[derive(Debug, Clone, Default)]
268struct YamlTokenTotals {
269 input_tokens: u64,
270 output_tokens: u64,
271 total_tokens: u64,
272 reasoning_tokens: Option<u64>,
273}
274
275impl YamlTokenTotals {
276 fn add_usage(&mut self, usage: &YamlLlmTokenUsage) {
277 self.input_tokens += u64::from(usage.prompt_tokens);
278 self.output_tokens += u64::from(usage.completion_tokens);
279 self.total_tokens += u64::from(usage.total_tokens);
280
281 if let Some(reasoning_tokens) = usage.reasoning_tokens {
282 let next = self.reasoning_tokens.unwrap_or(0) + u64::from(reasoning_tokens);
283 self.reasoning_tokens = Some(next);
284 }
285 }
286
287 fn tokens_per_second(&self, elapsed_ms: u128) -> f64 {
288 if elapsed_ms == 0 {
289 return 0.0;
290 }
291 round_two_decimals((self.output_tokens as f64) * 1000.0 / (elapsed_ms as f64))
292 }
293}
294
295fn round_two_decimals(value: f64) -> f64 {
296 (value * 100.0).round() / 100.0
297}
298
299fn completion_tokens_per_second(completion_tokens: u32, elapsed_ms: u128) -> f64 {
300 if elapsed_ms == 0 {
301 return 0.0;
302 }
303 round_two_decimals((completion_tokens as f64) * 1000.0 / (elapsed_ms as f64))
304}
305
306fn resolve_requested_model(run_model_override: Option<&str>, node_model: &str) -> String {
307 run_model_override
308 .and_then(|model| {
309 let trimmed = model.trim();
310 if trimmed.is_empty() {
311 None
312 } else {
313 Some(trimmed.to_string())
314 }
315 })
316 .unwrap_or_else(|| node_model.to_string())
317}
318
319fn default_true() -> bool {
320 true
321}
322
323fn default_sample_rate() -> f32 {
324 1.0
325}
326
327fn default_retention_days() -> u32 {
328 30
329}
330
331fn validate_sample_rate(sample_rate: f32) -> Result<(), YamlWorkflowRunError> {
332 if sample_rate.is_finite() && (0.0..=1.0).contains(&sample_rate) {
333 return Ok(());
334 }
335
336 Err(YamlWorkflowRunError::InvalidInput {
337 message: format!(
338 "telemetry.sample_rate must be between 0.0 and 1.0 inclusive; received {sample_rate}"
339 ),
340 })
341}
342
343fn should_sample_trace(trace_id: &str, sample_rate: f32) -> bool {
344 if sample_rate >= 1.0 {
345 return true;
346 }
347 if sample_rate <= 0.0 {
348 return false;
349 }
350
351 let mut hash: u64 = 0xcbf29ce484222325;
352 for byte in trace_id.as_bytes() {
353 hash ^= u64::from(*byte);
354 hash = hash.wrapping_mul(0x100000001b3);
355 }
356
357 let ratio = (hash as f64) / (u64::MAX as f64);
358 ratio < (sample_rate as f64)
359}
360
361fn trace_id_from_traceparent(traceparent: &str) -> Option<String> {
362 let mut parts = traceparent.split('-');
363 let version = parts.next()?;
364 let trace_id = parts.next()?;
365 let _span_id = parts.next()?;
366 let _flags = parts.next()?;
367 if parts.next().is_some() {
368 return None;
369 }
370
371 if version.len() != 2 || trace_id.len() != 32 {
372 return None;
373 }
374
375 if !version.chars().all(|ch| ch.is_ascii_hexdigit()) {
376 return None;
377 }
378 if !trace_id.chars().all(|ch| ch.is_ascii_hexdigit()) {
379 return None;
380 }
381 if trace_id.chars().all(|ch| ch == '0') {
382 return None;
383 }
384
385 Some(trace_id.to_ascii_lowercase())
386}
387
388#[derive(Debug, Clone, Copy, PartialEq, Eq)]
389enum TraceIdSource {
390 Disabled,
391 ExplicitTraceId,
392 ParentTraceId,
393 Traceparent,
394 ParentTraceparent,
395 Generated,
396}
397
398#[derive(Debug, Clone)]
399struct ResolvedTelemetryContext {
400 trace_id: Option<String>,
401 sampled: bool,
402 trace_id_source: TraceIdSource,
403}
404
405fn resolve_run_trace_id_with_source(
406 options: &YamlWorkflowRunOptions,
407 parent_trace_context: Option<&TraceContext>,
408) -> (Option<String>, TraceIdSource) {
409 if !options.telemetry.enabled {
410 return (None, TraceIdSource::Disabled);
411 }
412
413 if let Some(trace_id) = options
414 .trace
415 .context
416 .as_ref()
417 .and_then(|context| context.trace_id.clone())
418 {
419 return (Some(trace_id), TraceIdSource::ExplicitTraceId);
420 }
421
422 if let Some(trace_id) = parent_trace_context.and_then(|context| context.trace_id.clone()) {
423 return (Some(trace_id), TraceIdSource::ParentTraceId);
424 }
425
426 if let Some(trace_id) = options
427 .trace
428 .context
429 .as_ref()
430 .and_then(|context| context.traceparent.as_deref())
431 .and_then(trace_id_from_traceparent)
432 {
433 return (Some(trace_id), TraceIdSource::Traceparent);
434 }
435
436 if let Some(trace_id) = parent_trace_context
437 .and_then(|context| context.traceparent.as_deref())
438 .and_then(trace_id_from_traceparent)
439 {
440 return (Some(trace_id), TraceIdSource::ParentTraceparent);
441 }
442
443 (Some(generate_trace_id()), TraceIdSource::Generated)
444}
445
446fn resolve_telemetry_context(
447 options: &YamlWorkflowRunOptions,
448 parent_trace_context: Option<&TraceContext>,
449) -> ResolvedTelemetryContext {
450 let (trace_id, trace_id_source) =
451 resolve_run_trace_id_with_source(options, parent_trace_context);
452 let sampled = trace_id
453 .as_deref()
454 .map(|value| should_sample_trace(value, options.telemetry.sample_rate))
455 .unwrap_or(false);
456
457 ResolvedTelemetryContext {
458 trace_id,
459 sampled,
460 trace_id_source,
461 }
462}
463
464fn generate_trace_id() -> String {
465 use std::time::{SystemTime, UNIX_EPOCH};
466
467 let now_nanos = SystemTime::now()
468 .duration_since(UNIX_EPOCH)
469 .map(|duration| duration.as_nanos())
470 .unwrap_or(0);
471 let sequence = u128::from(TRACE_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed));
472 format!("{:032x}", now_nanos ^ sequence)
473}
474
475fn workflow_metadata_with_trace(
476 options: &YamlWorkflowRunOptions,
477 trace_id: &str,
478 sampled: bool,
479 trace_id_source: TraceIdSource,
480) -> Value {
481 json!({
482 "telemetry": {
483 "trace_id": trace_id,
484 "trace_id_source": match trace_id_source {
485 TraceIdSource::Disabled => "disabled",
486 TraceIdSource::ExplicitTraceId => "explicit_trace_id",
487 TraceIdSource::ParentTraceId => "parent_trace_id",
488 TraceIdSource::Traceparent => "traceparent",
489 TraceIdSource::ParentTraceparent => "parent_traceparent",
490 TraceIdSource::Generated => "generated",
491 },
492 "enabled": options.telemetry.enabled,
493 "sampled": sampled,
494 "nerdstats": options.telemetry.nerdstats,
495 "sample_rate": options.telemetry.sample_rate,
496 "payload_mode": match options.telemetry.payload_mode {
497 YamlWorkflowPayloadMode::FullPayload => "full_payload",
498 YamlWorkflowPayloadMode::RedactedPayload => "redacted_payload",
499 },
500 "retention_days": options.telemetry.retention_days,
501 "multi_tenant": options.telemetry.multi_tenant,
502 "tool_trace_mode": match options.telemetry.tool_trace_mode {
503 YamlToolTraceMode::Full => "full",
504 YamlToolTraceMode::Redacted => "redacted",
505 YamlToolTraceMode::Off => "off",
506 },
507 },
508 "trace": {
509 "tenant": {
510 "workspace_id": options.trace.tenant.workspace_id,
511 "user_id": options.trace.tenant.user_id,
512 "conversation_id": options.trace.tenant.conversation_id,
513 "request_id": options.trace.tenant.request_id,
514 "run_id": options.trace.tenant.run_id,
515 }
516 },
517 })
518}
519
520fn apply_trace_identity_attributes(span: &mut dyn WorkflowSpan, trace_id: Option<&str>) {
521 if let Some(value) = trace_id {
522 span.set_attribute("trace_id", value);
523 }
524}
525
526fn apply_trace_tenant_attributes_from_tenant(
527 span: &mut dyn WorkflowSpan,
528 tenant: &YamlWorkflowTraceTenantContext,
529) {
530 if let Some(workspace_id) = tenant.workspace_id.as_deref() {
531 span.set_attribute("tenant.workspace_id", workspace_id);
532 }
533 if let Some(user_id) = tenant.user_id.as_deref() {
534 span.set_attribute("tenant.user_id", user_id);
535 span.set_attribute("user.id", user_id);
536 span.set_attribute("langfuse.user.id", user_id);
537 }
538 if let Some(conversation_id) = tenant.conversation_id.as_deref() {
539 span.set_attribute("tenant.conversation_id", conversation_id);
540 span.set_attribute("session.id", conversation_id);
541 span.set_attribute("langfuse.session.id", conversation_id);
542 }
543 if let Some(request_id) = tenant.request_id.as_deref() {
544 span.set_attribute("tenant.request_id", request_id);
545 }
546 if let Some(run_id) = tenant.run_id.as_deref() {
547 span.set_attribute("tenant.run_id", run_id);
548 }
549}
550
551fn apply_trace_tenant_attributes(span: &mut dyn WorkflowSpan, options: &YamlWorkflowRunOptions) {
552 apply_trace_tenant_attributes_from_tenant(span, &options.trace.tenant);
553}
554
555fn workflow_nerdstats(output: &YamlWorkflowRunOutput) -> Value {
556 let llm_nodes_without_usage: Vec<String> = output
557 .step_timings
558 .iter()
559 .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
560 .map(|step| step.node_id.clone())
561 .collect();
562 let token_metrics_available = llm_nodes_without_usage.is_empty();
563 let token_metrics_source = if token_metrics_available {
564 "provider_usage"
565 } else {
566 "provider_stream_usage_unavailable"
567 };
568 let total_input_tokens = if token_metrics_available {
569 json!(output.total_input_tokens)
570 } else {
571 Value::Null
572 };
573 let total_output_tokens = if token_metrics_available {
574 json!(output.total_output_tokens)
575 } else {
576 Value::Null
577 };
578 let total_tokens = if token_metrics_available {
579 json!(output.total_tokens)
580 } else {
581 Value::Null
582 };
583 let total_reasoning_tokens = if token_metrics_available {
584 json!(output.total_reasoning_tokens)
585 } else {
586 Value::Null
587 };
588 let tokens_per_second = if token_metrics_available {
589 json!(output.tokens_per_second)
590 } else {
591 Value::Null
592 };
593
594 json!({
595 "workflow_id": output.workflow_id,
596 "terminal_node": output.terminal_node,
597 "total_elapsed_ms": output.total_elapsed_ms,
598 "ttft_ms": output.ttft_ms,
599 "step_details": output.step_timings,
600 "total_input_tokens": total_input_tokens,
601 "total_output_tokens": total_output_tokens,
602 "total_tokens": total_tokens,
603 "total_reasoning_tokens": total_reasoning_tokens,
604 "tokens_per_second": tokens_per_second,
605 "trace_id": output.trace_id,
606 "token_metrics_available": token_metrics_available,
607 "token_metrics_source": token_metrics_source,
608 "llm_nodes_without_usage": llm_nodes_without_usage,
609 })
610}
611
612fn apply_langfuse_nerdstats_attributes(
613 span: &mut dyn WorkflowSpan,
614 output: &YamlWorkflowRunOutput,
615 enabled: bool,
616) {
617 if !enabled {
618 return;
619 }
620
621 let nerdstats = workflow_nerdstats(output);
622 let nerdstats_json = nerdstats.to_string();
623 span.set_attribute("langfuse.trace.metadata.nerdstats", nerdstats_json.as_str());
624
625 span.set_attribute(
626 "langfuse.trace.metadata.nerdstats.workflow_id",
627 output.workflow_id.as_str(),
628 );
629 span.set_attribute(
630 "langfuse.trace.metadata.nerdstats.terminal_node",
631 output.terminal_node.as_str(),
632 );
633 span.set_attribute(
634 "langfuse.trace.metadata.nerdstats.total_elapsed_ms",
635 output.total_elapsed_ms.to_string().as_str(),
636 );
637 span.set_attribute(
638 "langfuse.trace.metadata.nerdstats.step_details_count",
639 output.step_timings.len().to_string().as_str(),
640 );
641 span.set_attribute(
642 "langfuse.trace.metadata.nerdstats.total_input_tokens",
643 output.total_input_tokens.to_string().as_str(),
644 );
645 span.set_attribute(
646 "langfuse.trace.metadata.nerdstats.total_output_tokens",
647 output.total_output_tokens.to_string().as_str(),
648 );
649 span.set_attribute(
650 "langfuse.trace.metadata.nerdstats.total_tokens",
651 output.total_tokens.to_string().as_str(),
652 );
653 span.set_attribute(
654 "langfuse.trace.metadata.nerdstats.tokens_per_second",
655 output.tokens_per_second.to_string().as_str(),
656 );
657
658 if let Some(ttft_ms) = output.ttft_ms {
659 span.set_attribute(
660 "langfuse.trace.metadata.nerdstats.ttft_ms",
661 ttft_ms.to_string().as_str(),
662 );
663 }
664
665 if let Some(reasoning_tokens) = output.total_reasoning_tokens {
666 span.set_attribute(
667 "langfuse.trace.metadata.nerdstats.total_reasoning_tokens",
668 reasoning_tokens.to_string().as_str(),
669 );
670 }
671
672 let llm_nodes_without_usage_count = output
673 .step_timings
674 .iter()
675 .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
676 .count();
677 let token_metrics_available = llm_nodes_without_usage_count == 0;
678 span.set_attribute(
679 "langfuse.trace.metadata.nerdstats.token_metrics_available",
680 if token_metrics_available {
681 "true"
682 } else {
683 "false"
684 },
685 );
686 span.set_attribute(
687 "langfuse.trace.metadata.nerdstats.token_metrics_source",
688 if token_metrics_available {
689 "provider_usage"
690 } else {
691 "provider_stream_usage_unavailable"
692 },
693 );
694 span.set_attribute(
695 "langfuse.trace.metadata.nerdstats.llm_nodes_without_usage_count",
696 llm_nodes_without_usage_count.to_string().as_str(),
697 );
698}
699
700fn apply_langfuse_trace_input_output_attributes(
701 span: &mut dyn WorkflowSpan,
702 workflow_input: &Value,
703 output: &YamlWorkflowRunOutput,
704 payload_mode: YamlWorkflowPayloadMode,
705) {
706 let trace_input = payload_for_span(payload_mode, workflow_input);
707 span.set_attribute("langfuse.trace.input", trace_input.as_str());
708
709 if let Some(terminal_output) = output.terminal_output.as_ref() {
710 let trace_output = payload_for_span(payload_mode, terminal_output);
711 span.set_attribute("langfuse.trace.output", trace_output.as_str());
712 }
713
714 let usage_details = json!({
715 "input": output.total_input_tokens,
716 "output": output.total_output_tokens,
717 "total": output.total_tokens,
718 "reasoning": output.total_reasoning_tokens,
719 })
720 .to_string();
721 span.set_attribute(
722 "langfuse.trace.metadata.usage_details",
723 usage_details.as_str(),
724 );
725}
726
727fn apply_langfuse_observation_usage_attributes(
728 span: &mut dyn WorkflowSpan,
729 usage: &YamlLlmTokenUsage,
730) {
731 let usage_details = json!({
732 "input": usage.prompt_tokens,
733 "output": usage.completion_tokens,
734 "total": usage.total_tokens,
735 "reasoning": usage.reasoning_tokens,
736 })
737 .to_string();
738 span.set_attribute("langfuse.observation.usage_details", usage_details.as_str());
739 span.set_attribute(
740 "gen_ai.usage.input_tokens",
741 usage.prompt_tokens.to_string().as_str(),
742 );
743 span.set_attribute(
744 "gen_ai.usage.output_tokens",
745 usage.completion_tokens.to_string().as_str(),
746 );
747 span.set_attribute(
748 "gen_ai.usage.total_tokens",
749 usage.total_tokens.to_string().as_str(),
750 );
751 if let Some(reasoning_tokens) = usage.reasoning_tokens {
752 span.set_attribute(
753 "gen_ai.usage.reasoning_tokens",
754 reasoning_tokens.to_string().as_str(),
755 );
756 }
757}
758
759fn payload_for_span(mode: YamlWorkflowPayloadMode, payload: &Value) -> String {
760 match mode {
761 YamlWorkflowPayloadMode::FullPayload => payload.to_string(),
762 YamlWorkflowPayloadMode::RedactedPayload => json!({
763 "redacted": true,
764 "value_type": match payload {
765 Value::Null => "null",
766 Value::Bool(_) => "bool",
767 Value::Number(_) => "number",
768 Value::String(_) => "string",
769 Value::Array(_) => "array",
770 Value::Object(_) => "object",
771 }
772 })
773 .to_string(),
774 }
775}
776
777fn payload_for_tool_trace(mode: YamlToolTraceMode, payload: &Value) -> Value {
778 match mode {
779 YamlToolTraceMode::Full => payload.clone(),
780 YamlToolTraceMode::Redacted => json!({
781 "redacted": true,
782 "value_type": json_type_name(payload),
783 }),
784 YamlToolTraceMode::Off => Value::Null,
785 }
786}
787
788fn validate_json_schema(schema: &Value) -> Result<(), String> {
789 jsonschema::JSONSchema::compile(schema)
790 .map(|_| ())
791 .map_err(|error| format!("invalid JSON schema: {error}"))
792}
793
794fn validate_schema_instance(schema: &Value, instance: &Value) -> Result<(), String> {
795 let validator = jsonschema::JSONSchema::compile(schema)
796 .map_err(|error| format!("invalid JSON schema: {error}"))?;
797 if let Err(errors) = validator.validate(instance) {
798 let message = errors
799 .into_iter()
800 .next()
801 .map(|error| error.to_string())
802 .unwrap_or_else(|| "unknown schema validation error".to_string());
803 return Err(format!("schema validation failed: {message}"));
804 }
805 Ok(())
806}
807
808fn schema_type(schema: &Value) -> Option<&str> {
809 schema.get("type").and_then(Value::as_str)
810}
811
812fn schema_expects_object(schema: &Value) -> bool {
813 schema_type(schema) == Some("object")
814}
815
816fn trace_context_from_options(options: &YamlWorkflowRunOptions) -> Option<TraceContext> {
817 options.trace.context.as_ref().map(|input| TraceContext {
818 trace_id: input.trace_id.clone(),
819 span_id: input.span_id.clone(),
820 parent_span_id: input.parent_span_id.clone(),
821 traceparent: input.traceparent.clone(),
822 tracestate: input.tracestate.clone(),
823 baggage: input.baggage.clone(),
824 })
825}
826
827fn merged_trace_context_for_worker(
828 span_context: Option<&TraceContext>,
829 resolved_trace_id: Option<&str>,
830 options: &YamlWorkflowRunOptions,
831) -> TraceContext {
832 let input_context = options.trace.context.as_ref();
833 let baggage = if let Some(context) = span_context {
834 if !context.baggage.is_empty() {
835 context.baggage.clone()
836 } else {
837 input_context
838 .map(|value| value.baggage.clone())
839 .unwrap_or_default()
840 }
841 } else {
842 input_context
843 .map(|value| value.baggage.clone())
844 .unwrap_or_default()
845 };
846
847 TraceContext {
848 trace_id: span_context
849 .and_then(|context| context.trace_id.clone())
850 .or_else(|| resolved_trace_id.map(|value| value.to_string()))
851 .or_else(|| input_context.and_then(|context| context.trace_id.clone())),
852 span_id: span_context
853 .and_then(|context| context.span_id.clone())
854 .or_else(|| input_context.and_then(|context| context.span_id.clone())),
855 parent_span_id: span_context
856 .and_then(|context| context.parent_span_id.clone())
857 .or_else(|| input_context.and_then(|context| context.parent_span_id.clone())),
858 traceparent: span_context
859 .and_then(|context| context.traceparent.clone())
860 .or_else(|| input_context.and_then(|context| context.traceparent.clone())),
861 tracestate: span_context
862 .and_then(|context| context.tracestate.clone())
863 .or_else(|| input_context.and_then(|context| context.tracestate.clone())),
864 baggage,
865 }
866}
867
868fn custom_worker_context_with_trace(
869 context: &Value,
870 trace_context: &TraceContext,
871 tenant_context: &YamlWorkflowTraceTenantContext,
872) -> Value {
873 let mut context_with_trace = context.clone();
874 let Some(root) = context_with_trace.as_object_mut() else {
875 return context_with_trace;
876 };
877
878 root.insert(
879 "trace".to_string(),
880 json!({
881 "context": {
882 "trace_id": trace_context.trace_id,
883 "span_id": trace_context.span_id,
884 "parent_span_id": trace_context.parent_span_id,
885 "traceparent": trace_context.traceparent,
886 "tracestate": trace_context.tracestate,
887 "baggage": trace_context.baggage,
888 },
889 "tenant": {
890 "workspace_id": tenant_context.workspace_id,
891 "user_id": tenant_context.user_id,
892 "conversation_id": tenant_context.conversation_id,
893 "request_id": tenant_context.request_id,
894 "run_id": tenant_context.run_id,
895 }
896 }),
897 );
898
899 context_with_trace
900}
901
902fn include_raw_stream_debug_events() -> bool {
903 match std::env::var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW") {
904 Ok(value) => {
905 let normalized = value.trim().to_ascii_lowercase();
906 normalized == "1" || normalized == "true" || normalized == "yes" || normalized == "on"
907 }
908 Err(_) => false,
909 }
910}
911
912#[derive(Debug)]
913struct StreamedPayloadResolution {
914 payload: Value,
915 heal_confidence: Option<f32>,
916}
917
918#[derive(Debug, Default)]
919struct StreamJsonAsTextFormatter {
920 raw_json: String,
921 emitted: bool,
922}
923
924impl StreamJsonAsTextFormatter {
925 fn push(&mut self, chunk: &str) {
926 self.raw_json.push_str(chunk);
927 }
928
929 fn emit_if_ready(&mut self, complete: bool) -> Option<String> {
930 if self.emitted || !complete {
931 return None;
932 }
933 self.emitted = true;
934 Some(render_json_object_as_text(self.raw_json.as_str()))
935 }
936}
937
938fn render_json_object_as_text(raw_json: &str) -> String {
939 let value = match serde_json::from_str::<Value>(raw_json) {
940 Ok(value) => value,
941 Err(_) => return raw_json.to_string(),
942 };
943 let Some(object) = value.as_object() else {
944 return raw_json.to_string();
945 };
946
947 let mut lines = Vec::with_capacity(object.len());
948 for (key, value) in object {
949 let rendered = match value {
950 Value::String(text) => text.clone(),
951 _ => value.to_string(),
952 };
953 lines.push(format!("{key}: {rendered}"));
954 }
955 lines.join("\n")
956}
957
958#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
959#[serde(rename_all = "snake_case")]
960pub enum YamlWorkflowTokenKind {
961 Output,
962 Thinking,
963}
964
965#[derive(Debug, Default)]
966struct StructuredJsonDeltaFilter {
967 started: bool,
968 completed: bool,
969 depth: u32,
970 in_string: bool,
971 escape: bool,
972}
973
974impl StructuredJsonDeltaFilter {
975 fn split(&mut self, delta: &str) -> (Option<String>, Option<String>) {
976 if delta.is_empty() {
977 return (None, None);
978 }
979
980 let mut output = String::new();
981 let mut thinking = String::new();
982
983 for ch in delta.chars() {
984 if self.completed {
985 thinking.push(ch);
986 continue;
987 }
988
989 if !self.started {
990 if ch != '{' {
991 thinking.push(ch);
992 continue;
993 }
994 self.started = true;
995 self.depth = 1;
996 output.push(ch);
997 continue;
998 }
999
1000 output.push(ch);
1001 if self.in_string {
1002 if self.escape {
1003 self.escape = false;
1004 continue;
1005 }
1006 if ch == '\\' {
1007 self.escape = true;
1008 continue;
1009 }
1010 if ch == '"' {
1011 self.in_string = false;
1012 }
1013 continue;
1014 }
1015
1016 match ch {
1017 '"' => self.in_string = true,
1018 '{' => self.depth = self.depth.saturating_add(1),
1019 '}' => {
1020 if self.depth > 0 {
1021 self.depth -= 1;
1022 }
1023 if self.depth == 0 {
1024 self.completed = true;
1025 }
1026 }
1027 _ => {}
1028 }
1029 }
1030
1031 let output = if output.is_empty() {
1032 None
1033 } else {
1034 Some(output)
1035 };
1036 let thinking = if thinking.is_empty() {
1037 None
1038 } else {
1039 Some(thinking)
1040 };
1041
1042 (output, thinking)
1043 }
1044
1045 fn completed(&self) -> bool {
1046 self.completed
1047 }
1048}
1049
1050fn extract_last_fenced_json_block(raw: &str) -> Option<&str> {
1051 let start = raw.rfind("```json")?;
1052 let remainder = &raw[start + "```json".len()..];
1053 let end = remainder.find("```")?;
1054 let candidate = remainder[..end].trim();
1055 if candidate.is_empty() {
1056 return None;
1057 }
1058 Some(candidate)
1059}
1060
1061fn extract_balanced_object_from(raw: &str, start_index: usize) -> Option<&str> {
1062 let mut depth = 0u32;
1063 let mut in_string = false;
1064 let mut escape = false;
1065
1066 for (relative_index, ch) in raw[start_index..].char_indices() {
1067 if in_string {
1068 if escape {
1069 escape = false;
1070 continue;
1071 }
1072 if ch == '\\' {
1073 escape = true;
1074 continue;
1075 }
1076 if ch == '"' {
1077 in_string = false;
1078 }
1079 continue;
1080 }
1081
1082 match ch {
1083 '"' => in_string = true,
1084 '{' => depth = depth.saturating_add(1),
1085 '}' => {
1086 if depth == 0 {
1087 return None;
1088 }
1089 depth -= 1;
1090 if depth == 0 {
1091 let end_index = start_index + relative_index + ch.len_utf8();
1092 return Some(raw[start_index..end_index].trim());
1093 }
1094 }
1095 _ => {}
1096 }
1097 }
1098
1099 None
1100}
1101
1102fn extract_last_parsable_object(raw: &str) -> Option<&str> {
1103 let starts: Vec<usize> = raw
1104 .char_indices()
1105 .filter_map(|(index, ch)| if ch == '{' { Some(index) } else { None })
1106 .collect();
1107
1108 for start in starts.into_iter().rev() {
1109 let Some(candidate) = extract_balanced_object_from(raw, start) else {
1110 continue;
1111 };
1112 if serde_json::from_str::<Value>(candidate).is_ok() {
1113 return Some(candidate);
1114 }
1115 }
1116
1117 None
1118}
1119
1120fn resolve_structured_json_candidate(raw: &str) -> Option<&str> {
1121 extract_last_fenced_json_block(raw).or_else(|| extract_last_parsable_object(raw))
1122}
1123
1124fn parse_streamed_structured_payload(
1125 raw: &str,
1126 heal: bool,
1127) -> Result<StreamedPayloadResolution, String> {
1128 if !heal {
1129 if let Ok(payload) = serde_json::from_str::<Value>(raw) {
1130 return Ok(StreamedPayloadResolution {
1131 payload,
1132 heal_confidence: None,
1133 });
1134 }
1135
1136 let candidate = resolve_structured_json_candidate(raw).ok_or_else(|| {
1137 "failed to parse streamed structured completion JSON: no JSON object candidate found"
1138 .to_string()
1139 })?;
1140 let payload = serde_json::from_str::<Value>(candidate).map_err(|error| {
1141 format!(
1142 "failed to parse streamed structured completion JSON: {error}; candidate={candidate}"
1143 )
1144 })?;
1145 return Ok(StreamedPayloadResolution {
1146 payload,
1147 heal_confidence: None,
1148 });
1149 }
1150
1151 let candidate = resolve_structured_json_candidate(raw).unwrap_or(raw);
1152 let parser = JsonishParser::new();
1153 let healed = parser
1154 .parse(candidate)
1155 .map_err(|error| format!("failed to heal streamed structured completion JSON: {error}"))?;
1156
1157 Ok(StreamedPayloadResolution {
1158 payload: healed.value,
1159 heal_confidence: Some(healed.confidence),
1160 })
1161}
1162
1163#[derive(Debug, Clone, PartialEq, Serialize)]
1164pub struct YamlWorkflowEvent {
1165 pub event_type: String,
1166 pub node_id: Option<String>,
1167 pub step_id: Option<String>,
1168 pub node_kind: Option<String>,
1169 pub streamable: Option<bool>,
1170 pub message: Option<String>,
1171 pub delta: Option<String>,
1172 pub token_kind: Option<YamlWorkflowTokenKind>,
1173 pub is_terminal_node_token: Option<bool>,
1174 pub elapsed_ms: Option<u128>,
1175 pub metadata: Option<Value>,
1176}
1177
1178pub type WorkflowMessageRole = Role;
1179
1180#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1181pub struct WorkflowMessage {
1182 pub role: WorkflowMessageRole,
1183 pub content: String,
1184 #[serde(default)]
1185 pub name: Option<String>,
1186 #[serde(default, alias = "toolCallId")]
1187 pub tool_call_id: Option<String>,
1188}
1189
1190#[derive(Debug, Clone, PartialEq, Serialize)]
1191pub struct YamlTemplateBinding {
1192 pub index: usize,
1193 pub expression: String,
1194 pub source_path: String,
1195 pub resolved: Value,
1196 pub resolved_type: String,
1197 pub missing: bool,
1198}
1199
1200#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1201pub enum YamlWorkflowDiagnosticSeverity {
1202 Error,
1203 Warning,
1204}
1205
1206#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1207pub struct YamlWorkflowDiagnostic {
1208 pub node_id: Option<String>,
1209 pub code: String,
1210 pub severity: YamlWorkflowDiagnosticSeverity,
1211 pub message: String,
1212}
1213
1214#[derive(Debug, Error)]
1215pub enum YamlWorkflowRunError {
1216 #[error("failed to read workflow yaml '{path}': {source}")]
1217 Read {
1218 path: String,
1219 source: std::io::Error,
1220 },
1221 #[error("failed to parse workflow yaml '{path}': {source}")]
1222 Parse {
1223 path: String,
1224 source: serde_yaml::Error,
1225 },
1226 #[error("rejected workflow yaml '{path}': {reason}")]
1227 FileRejected { path: String, reason: String },
1228 #[error("workflow '{workflow_id}' has no nodes")]
1229 EmptyNodes { workflow_id: String },
1230 #[error("entry node '{entry_node}' does not exist")]
1231 MissingEntry { entry_node: String },
1232 #[error("unknown node id '{node_id}'")]
1233 MissingNode { node_id: String },
1234 #[error("unsupported node type in '{node_id}'")]
1235 UnsupportedNodeType { node_id: String },
1236 #[error("unsupported switch condition format: {condition}")]
1237 UnsupportedCondition { condition: String },
1238 #[error("switch node '{node_id}' has no valid next target")]
1239 InvalidSwitchTarget { node_id: String },
1240 #[error("llm returned non-object payload for node '{node_id}'")]
1241 LlmPayloadNotObject { node_id: String },
1242 #[error("custom worker handler '{handler}' is not supported")]
1243 UnsupportedCustomHandler { handler: String },
1244 #[error("llm execution failed for node '{node_id}': {message}")]
1245 Llm { node_id: String, message: String },
1246 #[error("custom worker execution failed for node '{node_id}': {message}")]
1247 CustomWorker { node_id: String, message: String },
1248 #[error("workflow validation failed with {diagnostics_count} error(s)")]
1249 Validation {
1250 diagnostics_count: usize,
1251 diagnostics: Vec<YamlWorkflowDiagnostic>,
1252 },
1253 #[error("invalid workflow input: {message}")]
1254 InvalidInput { message: String },
1255 #[error("ir runtime execution failed: {message}")]
1256 IrRuntime { message: String },
1257 #[error("workflow event stream cancelled: {message}")]
1258 EventSinkCancelled { message: String },
1259}
1260
1261pub trait YamlWorkflowEventSink: Send + Sync {
1262 fn emit(&self, event: &YamlWorkflowEvent);
1263
1264 fn is_cancelled(&self) -> bool {
1265 false
1266 }
1267}
1268
1269pub struct NoopYamlWorkflowEventSink;
1270
1271impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
1272 fn emit(&self, _event: &YamlWorkflowEvent) {}
1273}
1274
1275fn workflow_event_sink_cancelled_message() -> &'static str {
1276 "workflow event callback cancelled"
1277}
1278
1279fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
1280 event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
1281}
1282
1283#[derive(Debug, Clone, PartialEq, Eq, Error)]
1284pub enum YamlToIrError {
1285 #[error("entry node '{entry_node}' does not exist")]
1286 MissingEntry { entry_node: String },
1287 #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
1288 MultipleOutgoingEdge { node_id: String },
1289 #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
1290 UnsupportedNode { node_id: String, reason: String },
1291}
1292
1293pub fn yaml_workflow_to_mermaid(workflow: &YamlWorkflow) -> String {
1295 if let Ok(ir) = yaml_workflow_to_ir(workflow) {
1296 return workflow_to_mermaid(&ir);
1297 }
1298
1299 yaml_workflow_to_mermaid_fallback(workflow)
1300}
1301
1302fn yaml_workflow_to_mermaid_fallback(workflow: &YamlWorkflow) -> String {
1303 let mut lines = Vec::new();
1304 lines.push("flowchart TD".to_string());
1305 let mut tool_node_ids: Vec<String> = Vec::new();
1306
1307 for node in &workflow.nodes {
1308 let label = format!("{}\\n({})", node.id, node.kind_name());
1309 lines.push(format!(
1310 " {}[\"{}\"]",
1311 sanitize_mermaid_id(&node.id),
1312 escape_mermaid_label(label.as_str()),
1313 ));
1314
1315 if let Some(llm) = node.node_type.llm_call.as_ref() {
1316 for (idx, tool_name) in llm_tool_names(llm).into_iter().enumerate() {
1317 let tool_id = sanitize_mermaid_id(format!("{}__tool_{}", node.id, idx).as_str());
1318 lines.push(format!(
1319 " {}([\"{}\"])",
1320 tool_id,
1321 escape_mermaid_label(format!("tool: {tool_name}").as_str())
1322 ));
1323 lines.push(format!(
1324 " {} -.-> {}",
1325 sanitize_mermaid_id(&node.id),
1326 tool_id
1327 ));
1328 tool_node_ids.push(tool_id);
1329 }
1330 }
1331 }
1332
1333 let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1334
1335 for edge in &workflow.edges {
1336 emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1337 }
1338
1339 for node in &workflow.nodes {
1340 if let Some(switch) = node.node_type.switch.as_ref() {
1341 for branch in &switch.branches {
1342 emitted.insert((
1343 node.id.clone(),
1344 branch.condition.clone(),
1345 branch.target.clone(),
1346 ));
1347 }
1348 emitted.insert((
1349 node.id.clone(),
1350 "default".to_string(),
1351 switch.default.clone(),
1352 ));
1353 }
1354 }
1355
1356 let mut edges = emitted.into_iter().collect::<Vec<_>>();
1357 edges.sort();
1358
1359 for (from, label, to) in edges {
1360 if label.is_empty() {
1361 lines.push(format!(
1362 " {} --> {}",
1363 sanitize_mermaid_id(&from),
1364 sanitize_mermaid_id(&to)
1365 ));
1366 } else {
1367 lines.push(format!(
1368 " {} -- \"{}\" --> {}",
1369 sanitize_mermaid_id(&from),
1370 escape_mermaid_label(&label),
1371 sanitize_mermaid_id(&to)
1372 ));
1373 }
1374 }
1375
1376 if !tool_node_ids.is_empty() {
1377 lines.push(" classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1378 lines.push(format!(" class {} toolNode;", tool_node_ids.join(",")));
1379 }
1380
1381 lines.join("\n")
1382}
1383
1384fn llm_tool_names(llm: &YamlLlmCall) -> Vec<String> {
1385 llm.tools
1386 .iter()
1387 .map(|tool| match tool {
1388 YamlToolDeclaration::OpenAi(openai) => openai.function.name.clone(),
1389 YamlToolDeclaration::Simplified(simple) => simple.name.clone(),
1390 })
1391 .collect()
1392}
1393
1394pub fn yaml_workflow_file_to_mermaid(workflow_path: &Path) -> Result<String, YamlWorkflowRunError> {
1396 let (canonical_path, workflow) = load_workflow_yaml_file(workflow_path)?;
1397
1398 let referenced_subgraphs = discover_referenced_subgraphs(canonical_path.as_path(), &workflow)?;
1399 if referenced_subgraphs.is_empty() {
1400 return Ok(yaml_workflow_to_mermaid(&workflow));
1401 }
1402
1403 Ok(yaml_workflow_to_mermaid_with_subgraphs(
1404 &workflow,
1405 &referenced_subgraphs,
1406 ))
1407}
1408
1409#[derive(Debug, Clone)]
1410struct MermaidSubgraphWorkflow {
1411 alias: String,
1412 workflow: YamlWorkflow,
1413}
1414
1415#[derive(Debug, Default)]
1416struct MermaidBlockRender {
1417 lines: Vec<String>,
1418 tool_node_ids: Vec<String>,
1419 run_workflow_tool_node_ids: Vec<String>,
1420 entry_node_id: String,
1421}
1422
1423fn yaml_workflow_to_mermaid_with_subgraphs(
1424 workflow: &YamlWorkflow,
1425 subgraphs: &[MermaidSubgraphWorkflow],
1426) -> String {
1427 let mut lines = vec!["flowchart TD".to_string()];
1428
1429 let main_block = render_mermaid_block(workflow, "main");
1430 lines.push(format!(
1431 " subgraph main_graph[\"Main: {}\"]",
1432 escape_mermaid_label(&workflow.id)
1433 ));
1434 for line in &main_block.lines {
1435 lines.push(format!(" {line}"));
1436 }
1437 lines.push(" end".to_string());
1438
1439 let mut all_tool_nodes = main_block.tool_node_ids.clone();
1440
1441 for (index, subgraph) in subgraphs.iter().enumerate() {
1442 let block_id = format!("subgraph_{}", index + 1);
1443 let block = render_mermaid_block(&subgraph.workflow, &block_id);
1444 lines.push(format!(
1445 " subgraph {}[\"Subgraph: {}\"]",
1446 sanitize_mermaid_id(&format!("{}_cluster", block_id)),
1447 escape_mermaid_label(&subgraph.alias)
1448 ));
1449 for line in &block.lines {
1450 lines.push(format!(" {line}"));
1451 }
1452 lines.push(" end".to_string());
1453
1454 for tool_node in &main_block.run_workflow_tool_node_ids {
1455 lines.push(format!(
1456 " {} -. \"{}\" .-> {}",
1457 tool_node,
1458 escape_mermaid_label(&format!("calls {}", subgraph.alias)),
1459 block.entry_node_id
1460 ));
1461 }
1462
1463 all_tool_nodes.extend(block.tool_node_ids);
1464 }
1465
1466 if !all_tool_nodes.is_empty() {
1467 lines.push(" classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1468 lines.push(format!(" class {} toolNode;", all_tool_nodes.join(",")));
1469 }
1470
1471 lines.join("\n")
1472}
1473
1474fn render_mermaid_block(workflow: &YamlWorkflow, prefix: &str) -> MermaidBlockRender {
1475 let mut block = MermaidBlockRender {
1476 entry_node_id: prefixed_mermaid_id(prefix, &workflow.entry_node),
1477 ..Default::default()
1478 };
1479
1480 for node in &workflow.nodes {
1481 let node_id = prefixed_mermaid_id(prefix, &node.id);
1482 let label = format!("{}\\n({})", node.id, node.kind_name());
1483 block
1484 .lines
1485 .push(format!("{}[\"{}\"]", node_id, escape_mermaid_label(&label)));
1486
1487 if let Some(llm) = node.node_type.llm_call.as_ref() {
1488 for (idx, tool) in llm.tools.iter().enumerate() {
1489 let tool_name = tool_declaration_name(tool);
1490 let tool_id = prefixed_mermaid_id(prefix, &format!("{}__tool_{}", node.id, idx));
1491 block.lines.push(format!(
1492 "{}([\"{}\"])",
1493 tool_id,
1494 escape_mermaid_label(format!("tool: {tool_name}").as_str())
1495 ));
1496 block.lines.push(format!("{} -.-> {}", node_id, tool_id));
1497 block.tool_node_ids.push(tool_id.clone());
1498
1499 if tool_name == "run_workflow_graph" {
1500 block.run_workflow_tool_node_ids.push(tool_id);
1501 }
1502 }
1503 }
1504 }
1505
1506 let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1507
1508 for edge in &workflow.edges {
1509 emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1510 }
1511
1512 for node in &workflow.nodes {
1513 if let Some(switch) = node.node_type.switch.as_ref() {
1514 for branch in &switch.branches {
1515 emitted.insert((
1516 node.id.clone(),
1517 branch.condition.clone(),
1518 branch.target.clone(),
1519 ));
1520 }
1521 emitted.insert((
1522 node.id.clone(),
1523 "default".to_string(),
1524 switch.default.clone(),
1525 ));
1526 }
1527 }
1528
1529 let mut edges = emitted.into_iter().collect::<Vec<_>>();
1530 edges.sort();
1531
1532 for (from, label, to) in edges {
1533 let from_id = prefixed_mermaid_id(prefix, &from);
1534 let to_id = prefixed_mermaid_id(prefix, &to);
1535 if label.is_empty() {
1536 block.lines.push(format!("{} --> {}", from_id, to_id));
1537 } else {
1538 block.lines.push(format!(
1539 "{} -- \"{}\" --> {}",
1540 from_id,
1541 escape_mermaid_label(&label),
1542 to_id
1543 ));
1544 }
1545 }
1546
1547 block
1548}
1549
1550fn discover_referenced_subgraphs(
1551 workflow_path: &Path,
1552 workflow: &YamlWorkflow,
1553) -> Result<Vec<MermaidSubgraphWorkflow>, YamlWorkflowRunError> {
1554 let workflow_ids = referenced_workflow_ids(workflow);
1555 if workflow_ids.is_empty() {
1556 return Ok(Vec::new());
1557 }
1558
1559 let parent_dir = workflow_path.parent().unwrap_or(Path::new("."));
1560 let sibling_workflows = load_yaml_sibling_workflows(parent_dir, workflow_path)?;
1561
1562 let mut discovered = Vec::new();
1563 let mut seen = HashSet::new();
1564
1565 for workflow_id in workflow_ids {
1566 if seen.contains(&workflow_id) {
1567 continue;
1568 }
1569
1570 if let Some((_, subworkflow)) = sibling_workflows
1571 .iter()
1572 .find(|(key, _)| key == &workflow_id)
1573 {
1574 discovered.push(MermaidSubgraphWorkflow {
1575 alias: workflow_id.clone(),
1576 workflow: subworkflow.clone(),
1577 });
1578 seen.insert(workflow_id);
1579 }
1580 }
1581
1582 Ok(discovered)
1583}
1584
1585fn load_yaml_sibling_workflows(
1586 parent_dir: &Path,
1587 workflow_path: &Path,
1588) -> Result<Vec<(String, YamlWorkflow)>, YamlWorkflowRunError> {
1589 let mut results = Vec::new();
1590 let canonical_target =
1591 std::fs::canonicalize(workflow_path).unwrap_or_else(|_| workflow_path.to_path_buf());
1592 let entries = std::fs::read_dir(parent_dir).map_err(|source| YamlWorkflowRunError::Read {
1593 path: parent_dir.display().to_string(),
1594 source,
1595 })?;
1596
1597 for entry in entries {
1598 let entry = entry.map_err(|source| YamlWorkflowRunError::Read {
1599 path: parent_dir.display().to_string(),
1600 source,
1601 })?;
1602 let path = entry.path();
1603 if !is_yaml_file(&path) {
1604 continue;
1605 }
1606
1607 let canonical_path = std::fs::canonicalize(&path).unwrap_or(path.clone());
1608 if canonical_path == canonical_target {
1609 continue;
1610 }
1611
1612 let (_, subworkflow) = load_workflow_yaml_file(path.as_path())?;
1613
1614 if let Some(stem) = path.file_stem().and_then(|value| value.to_str()) {
1615 results.push((stem.to_string(), subworkflow.clone()));
1616 }
1617 results.push((subworkflow.id.clone(), subworkflow));
1618 }
1619
1620 Ok(results)
1621}
1622
1623fn referenced_workflow_ids(workflow: &YamlWorkflow) -> Vec<String> {
1624 let mut ids = Vec::new();
1625 let mut seen = HashSet::new();
1626
1627 for node in &workflow.nodes {
1628 let prompt = node
1629 .config
1630 .as_ref()
1631 .and_then(|config| config.prompt.as_deref());
1632
1633 if let Some(llm) = node.node_type.llm_call.as_ref() {
1634 for tool in &llm.tools {
1635 if tool_declaration_name(tool) != "run_workflow_graph" {
1636 continue;
1637 }
1638
1639 for workflow_id in referenced_workflow_ids_from_tool(tool) {
1640 if seen.insert(workflow_id.clone()) {
1641 ids.push(workflow_id);
1642 }
1643 }
1644
1645 if let Some(prompt_text) = prompt {
1646 for workflow_id in referenced_workflow_ids_from_prompt(prompt_text) {
1647 if seen.insert(workflow_id.clone()) {
1648 ids.push(workflow_id);
1649 }
1650 }
1651 }
1652 }
1653 }
1654 }
1655
1656 ids
1657}
1658
1659fn referenced_workflow_ids_from_tool(tool: &YamlToolDeclaration) -> Vec<String> {
1660 let schema = match tool {
1661 YamlToolDeclaration::OpenAi(openai) => openai.function.parameters.as_ref(),
1662 YamlToolDeclaration::Simplified(simple) => Some(&simple.input_schema),
1663 };
1664
1665 let mut ids = Vec::new();
1666 if let Some(schema) = schema {
1667 if let Some(workflow_prop) = schema
1668 .get("properties")
1669 .and_then(Value::as_object)
1670 .and_then(|properties| properties.get("workflow_id"))
1671 {
1672 if let Some(value) = workflow_prop.get("const").and_then(Value::as_str) {
1673 ids.push(value.to_string());
1674 }
1675
1676 if let Some(enum_values) = workflow_prop.get("enum").and_then(Value::as_array) {
1677 for value in enum_values {
1678 if let Some(value) = value.as_str() {
1679 ids.push(value.to_string());
1680 }
1681 }
1682 }
1683 }
1684 }
1685
1686 ids
1687}
1688
1689fn referenced_workflow_ids_from_prompt(prompt: &str) -> Vec<String> {
1690 let mut ids = Vec::new();
1691 let mut search = prompt;
1692
1693 while let Some(index) = search.find("\"workflow_id\"") {
1694 let remainder = &search[index + "\"workflow_id\"".len()..];
1695 let Some(colon_index) = remainder.find(':') else {
1696 break;
1697 };
1698
1699 let candidate = remainder[colon_index + 1..].trim_start();
1700 if let Some(rest) = candidate.strip_prefix('"') {
1701 if let Some(end_quote_index) = rest.find('"') {
1702 let workflow_id = rest[..end_quote_index].trim();
1703 if !workflow_id.is_empty() {
1704 ids.push(workflow_id.to_string());
1705 }
1706 search = &rest[end_quote_index + 1..];
1707 continue;
1708 }
1709 }
1710
1711 search = &remainder[colon_index + 1..];
1712 }
1713
1714 ids
1715}
1716
1717fn is_yaml_file(path: &Path) -> bool {
1718 matches!(
1719 path.extension().and_then(|ext| ext.to_str()),
1720 Some("yaml") | Some("yml")
1721 )
1722}
1723
1724fn yaml_value_depth(value: &serde_yaml::Value, depth: usize) -> usize {
1725 match value {
1726 serde_yaml::Value::Sequence(items) => items
1727 .iter()
1728 .map(|item| yaml_value_depth(item, depth + 1))
1729 .max()
1730 .unwrap_or(depth),
1731 serde_yaml::Value::Mapping(map) => map
1732 .values()
1733 .map(|item| yaml_value_depth(item, depth + 1))
1734 .max()
1735 .unwrap_or(depth),
1736 _ => depth,
1737 }
1738}
1739
1740fn load_workflow_yaml_file(
1741 workflow_path: &Path,
1742) -> Result<(PathBuf, YamlWorkflow), YamlWorkflowRunError> {
1743 let canonical_path =
1744 std::fs::canonicalize(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1745 path: workflow_path.display().to_string(),
1746 source,
1747 })?;
1748
1749 let metadata =
1750 std::fs::metadata(&canonical_path).map_err(|source| YamlWorkflowRunError::Read {
1751 path: canonical_path.display().to_string(),
1752 source,
1753 })?;
1754
1755 if !metadata.is_file() {
1756 return Err(YamlWorkflowRunError::FileRejected {
1757 path: canonical_path.display().to_string(),
1758 reason: "path must reference a regular file".to_string(),
1759 });
1760 }
1761
1762 if !is_yaml_file(&canonical_path) {
1763 return Err(YamlWorkflowRunError::FileRejected {
1764 path: canonical_path.display().to_string(),
1765 reason: "workflow file extension must be .yaml or .yml".to_string(),
1766 });
1767 }
1768
1769 if metadata.len() > MAX_WORKFLOW_YAML_BYTES {
1770 return Err(YamlWorkflowRunError::FileRejected {
1771 path: canonical_path.display().to_string(),
1772 reason: format!(
1773 "workflow yaml is too large ({} bytes > {} bytes)",
1774 metadata.len(),
1775 MAX_WORKFLOW_YAML_BYTES
1776 ),
1777 });
1778 }
1779
1780 let contents =
1781 std::fs::read_to_string(&canonical_path).map_err(|source| YamlWorkflowRunError::Read {
1782 path: canonical_path.display().to_string(),
1783 source,
1784 })?;
1785
1786 let yaml_value: serde_yaml::Value =
1787 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1788 path: canonical_path.display().to_string(),
1789 source,
1790 })?;
1791
1792 let depth = yaml_value_depth(&yaml_value, 1);
1793 if depth > MAX_WORKFLOW_YAML_DEPTH {
1794 return Err(YamlWorkflowRunError::FileRejected {
1795 path: canonical_path.display().to_string(),
1796 reason: format!(
1797 "workflow yaml nesting depth {} exceeds limit {}",
1798 depth, MAX_WORKFLOW_YAML_DEPTH
1799 ),
1800 });
1801 }
1802
1803 let workflow: YamlWorkflow =
1804 serde_yaml::from_value(yaml_value).map_err(|source| YamlWorkflowRunError::Parse {
1805 path: canonical_path.display().to_string(),
1806 source,
1807 })?;
1808
1809 Ok((canonical_path, workflow))
1810}
1811
1812fn prefixed_mermaid_id(prefix: &str, id: &str) -> String {
1813 sanitize_mermaid_id(format!("{}__{}", prefix, id).as_str())
1814}
1815
1816fn tool_declaration_name(tool: &YamlToolDeclaration) -> &str {
1817 match tool {
1818 YamlToolDeclaration::OpenAi(openai) => &openai.function.name,
1819 YamlToolDeclaration::Simplified(simple) => &simple.name,
1820 }
1821}
1822
1823pub fn yaml_workflow_to_ir(workflow: &YamlWorkflow) -> Result<WorkflowDefinition, YamlToIrError> {
1824 let known_ids: HashSet<&str> = workflow.nodes.iter().map(|n| n.id.as_str()).collect();
1825 if !known_ids.contains(workflow.entry_node.as_str()) {
1826 return Err(YamlToIrError::MissingEntry {
1827 entry_node: workflow.entry_node.clone(),
1828 });
1829 }
1830
1831 let mut outgoing: HashMap<&str, Vec<&str>> = HashMap::new();
1832 for edge in &workflow.edges {
1833 outgoing
1834 .entry(edge.from.as_str())
1835 .or_default()
1836 .push(edge.to.as_str());
1837 }
1838
1839 let mut nodes = Vec::with_capacity(workflow.nodes.len() + 1);
1840 nodes.push(Node {
1841 id: YAML_START_NODE_ID.to_string(),
1842 kind: NodeKind::Start {
1843 next: workflow.entry_node.clone(),
1844 },
1845 });
1846
1847 for node in &workflow.nodes {
1848 if let Some(llm) = node.node_type.llm_call.as_ref() {
1849 if node
1850 .config
1851 .as_ref()
1852 .and_then(|c| c.set_globals.as_ref())
1853 .is_some()
1854 || node
1855 .config
1856 .as_ref()
1857 .and_then(|c| c.update_globals.as_ref())
1858 .is_some()
1859 {
1860 return Err(YamlToIrError::UnsupportedNode {
1861 node_id: node.id.clone(),
1862 reason: "set_globals/update_globals are not represented in canonical IR llm nodes yet"
1863 .to_string(),
1864 });
1865 }
1866
1867 if !llm.tools.is_empty() {
1868 return Err(YamlToIrError::UnsupportedNode {
1869 node_id: node.id.clone(),
1870 reason: "llm_call.tools are not represented in canonical IR llm nodes yet"
1871 .to_string(),
1872 });
1873 }
1874
1875 let next = single_next_for_node(&outgoing, &node.id)?;
1876 nodes.push(Node {
1877 id: node.id.clone(),
1878 kind: NodeKind::Tool {
1879 tool: YAML_LLM_TOOL_ID.to_string(),
1880 input: json!({
1881 "node_id": node.id,
1882 "model": llm.model,
1883 "prompt_template": node
1884 .config
1885 .as_ref()
1886 .and_then(|c| c.prompt.clone())
1887 .unwrap_or_default(),
1888 "stream": llm.stream.unwrap_or(false),
1889 "stream_json_as_text": llm.stream_json_as_text.unwrap_or(false),
1890 "heal": llm.heal.unwrap_or(false),
1891 "max_tokens": llm.max_tokens,
1892 "temperature": llm.temperature,
1893 "top_p": llm.top_p,
1894 "messages_path": llm.messages_path,
1895 "append_prompt_as_user": llm.append_prompt_as_user.unwrap_or(true),
1896 "output_schema": node
1897 .config
1898 .as_ref()
1899 .and_then(|c| c.output_schema.clone())
1900 .unwrap_or_else(default_llm_output_schema),
1901 }),
1902 next,
1903 },
1904 });
1905 continue;
1906 }
1907
1908 if let Some(worker) = node.node_type.custom_worker.as_ref() {
1909 if node
1910 .config
1911 .as_ref()
1912 .and_then(|c| c.set_globals.as_ref())
1913 .is_some()
1914 || node
1915 .config
1916 .as_ref()
1917 .and_then(|c| c.update_globals.as_ref())
1918 .is_some()
1919 {
1920 return Err(YamlToIrError::UnsupportedNode {
1921 node_id: node.id.clone(),
1922 reason: "set_globals/update_globals are not represented in canonical IR tool nodes yet"
1923 .to_string(),
1924 });
1925 }
1926
1927 let next = single_next_for_node(&outgoing, &node.id)?;
1928 nodes.push(Node {
1929 id: node.id.clone(),
1930 kind: NodeKind::Tool {
1931 tool: worker.handler.clone(),
1932 input: {
1933 let mut payload = node
1934 .config
1935 .as_ref()
1936 .and_then(|c| c.payload.clone())
1937 .unwrap_or_else(|| json!({}));
1938 if let Some(payload_obj) = payload.as_object_mut() {
1939 payload_obj.insert(
1940 "__handler_file".to_string(),
1941 worker
1942 .handler_file
1943 .as_ref()
1944 .map(|value| Value::String(value.clone()))
1945 .unwrap_or(Value::Null),
1946 );
1947 }
1948 payload
1949 },
1950 next,
1951 },
1952 });
1953 continue;
1954 }
1955
1956 if let Some(switch) = node.node_type.switch.as_ref() {
1957 let mut routes = Vec::with_capacity(switch.branches.len());
1958 for branch in &switch.branches {
1959 let rewritten =
1960 rewrite_yaml_condition_to_ir(&branch.condition).map_err(|reason| {
1961 YamlToIrError::UnsupportedNode {
1962 node_id: node.id.clone(),
1963 reason: format!(
1964 "failed to rewrite switch condition '{}': {}",
1965 branch.condition, reason
1966 ),
1967 }
1968 })?;
1969 routes.push(RouterRoute {
1970 when: rewritten,
1971 next: branch.target.clone(),
1972 });
1973 }
1974 nodes.push(Node {
1975 id: node.id.clone(),
1976 kind: NodeKind::Router {
1977 routes,
1978 default: switch.default.clone(),
1979 },
1980 });
1981 continue;
1982 }
1983
1984 return Err(YamlToIrError::UnsupportedNode {
1985 node_id: node.id.clone(),
1986 reason: "node_type must be llm_call, switch, or custom_worker".to_string(),
1987 });
1988 }
1989
1990 Ok(WorkflowDefinition {
1991 version: WORKFLOW_IR_V0.to_string(),
1992 name: workflow.id.clone(),
1993 nodes,
1994 })
1995}
1996
1997fn single_next_for_node(
1998 outgoing: &HashMap<&str, Vec<&str>>,
1999 node_id: &str,
2000) -> Result<Option<String>, YamlToIrError> {
2001 match outgoing.get(node_id) {
2002 None => Ok(None),
2003 Some(targets) if targets.len() == 1 => Ok(Some(targets[0].to_string())),
2004 Some(_) => Err(YamlToIrError::MultipleOutgoingEdge {
2005 node_id: node_id.to_string(),
2006 }),
2007 }
2008}
2009
2010fn rewrite_yaml_condition_to_ir(expr: &str) -> Result<String, String> {
2011 let chars: Vec<char> = expr.chars().collect();
2012 let mut out = String::with_capacity(expr.len());
2013 let mut index = 0usize;
2014 let mut quote_context: Option<char> = None;
2015 let mut escaped = false;
2016
2017 while index < chars.len() {
2018 let current = chars[index];
2019
2020 if let Some(quote) = quote_context {
2021 out.push(current);
2022 if escaped {
2023 escaped = false;
2024 } else if current == '\\' {
2025 escaped = true;
2026 } else if current == quote {
2027 quote_context = None;
2028 }
2029 index += 1;
2030 continue;
2031 }
2032
2033 if current == '"' || current == '\'' {
2034 quote_context = Some(current);
2035 out.push(current);
2036 index += 1;
2037 continue;
2038 }
2039
2040 if starts_with_chars(&chars, index, "$.nodes.") {
2041 out.push_str("$.node_outputs.");
2042 index += "$.nodes.".chars().count();
2043 continue;
2044 }
2045
2046 if starts_with_chars(&chars, index, ".output")
2047 && is_output_segment_boundary(chars.get(index + ".output".chars().count()).copied())
2048 {
2049 index += ".output".chars().count();
2050 continue;
2051 }
2052
2053 out.push(current);
2054 index += 1;
2055 }
2056
2057 if quote_context.is_some() {
2058 return Err("condition contains an unterminated quoted string".to_string());
2059 }
2060
2061 Ok(out)
2062}
2063
2064fn starts_with_chars(chars: &[char], start: usize, pattern: &str) -> bool {
2065 let pattern_chars: Vec<char> = pattern.chars().collect();
2066 if start + pattern_chars.len() > chars.len() {
2067 return false;
2068 }
2069 chars[start..start + pattern_chars.len()] == pattern_chars
2070}
2071
2072fn is_output_segment_boundary(next: Option<char>) -> bool {
2073 match next {
2074 None => true,
2075 Some(ch) => {
2076 ch.is_whitespace()
2077 || matches!(
2078 ch,
2079 '.' | ')' | ']' | '}' | ',' | '=' | '!' | '<' | '>' | '&' | '|'
2080 )
2081 }
2082 }
2083}
2084
2085fn sanitize_mermaid_id(id: &str) -> String {
2086 let mut out = String::with_capacity(id.len() + 1);
2087 if id
2088 .chars()
2089 .next()
2090 .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_')
2091 {
2092 out.push_str(id);
2093 } else {
2094 out.push('n');
2095 out.push('_');
2096 out.push_str(id);
2097 }
2098 out.chars()
2099 .map(|ch| {
2100 if ch.is_ascii_alphanumeric() || ch == '_' {
2101 ch
2102 } else {
2103 '_'
2104 }
2105 })
2106 .collect()
2107}
2108
2109fn escape_mermaid_label(label: &str) -> String {
2110 label.replace('"', "\\\"")
2111}
2112
2113#[derive(Debug, Clone)]
2114pub struct YamlLlmExecutionRequest {
2115 pub node_id: String,
2116 pub is_terminal_node: bool,
2117 pub stream_json_as_text: bool,
2118 pub model: String,
2119 pub max_tokens: Option<u32>,
2120 pub temperature: Option<f32>,
2121 pub top_p: Option<f32>,
2122 pub messages: Option<Vec<Message>>,
2123 pub append_prompt_as_user: bool,
2124 pub prompt: String,
2125 pub prompt_template: String,
2126 pub prompt_bindings: Vec<YamlTemplateBinding>,
2127 pub schema: Value,
2128 pub stream: bool,
2129 pub heal: bool,
2130 pub tools: Vec<YamlResolvedTool>,
2131 pub tool_choice: Option<ToolChoice>,
2132 pub max_tool_roundtrips: u8,
2133 pub tool_calls_global_key: Option<String>,
2134 pub tool_trace_mode: YamlToolTraceMode,
2135 pub execution_context: Value,
2136 pub email_text: String,
2137 pub trace_id: Option<String>,
2138 pub trace_context: Option<TraceContext>,
2139 pub tenant_context: YamlWorkflowTraceTenantContext,
2140 pub trace_sampled: bool,
2141}
2142
2143#[derive(Debug, Clone)]
2144pub struct YamlResolvedTool {
2145 pub definition: ToolDefinition,
2146 pub output_schema: Option<Value>,
2147}
2148
2149#[async_trait]
2150pub trait YamlWorkflowLlmExecutor: Send + Sync {
2151 async fn complete_structured(
2152 &self,
2153 request: YamlLlmExecutionRequest,
2154 event_sink: Option<&dyn YamlWorkflowEventSink>,
2155 ) -> Result<YamlLlmExecutionResult, String>;
2156}
2157
2158#[async_trait]
2159pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
2160 async fn execute(
2161 &self,
2162 handler: &str,
2163 handler_file: Option<&str>,
2164 payload: &Value,
2165 email_text: &str,
2166 context: &Value,
2167 ) -> Result<Value, String>;
2168}
2169
2170pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2171 workflow: &YamlWorkflow,
2172 workflow_input: &Value,
2173 client: &SimpleAgentsClient,
2174 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2175 event_sink: Option<&dyn YamlWorkflowEventSink>,
2176 options: &YamlWorkflowRunOptions,
2177) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2178 let executor = BorrowedClientExecutor {
2179 client,
2180 custom_worker,
2181 run_options: options.clone(),
2182 };
2183 run_workflow_yaml_with_custom_worker_and_events_and_options(
2184 workflow,
2185 workflow_input,
2186 &executor,
2187 custom_worker,
2188 event_sink,
2189 options,
2190 )
2191 .await
2192}
2193
2194pub async fn run_workflow_yaml_with_custom_worker_and_events_and_options(
2195 workflow: &YamlWorkflow,
2196 workflow_input: &Value,
2197 executor: &dyn YamlWorkflowLlmExecutor,
2198 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2199 event_sink: Option<&dyn YamlWorkflowEventSink>,
2200 options: &YamlWorkflowRunOptions,
2201) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2202 execute::run_workflow_yaml_with_custom_worker_and_events_and_options_impl(
2203 workflow,
2204 workflow_input,
2205 executor,
2206 custom_worker,
2207 event_sink,
2208 options,
2209 )
2210 .await
2211}
2212
2213async fn try_run_yaml_via_ir_runtime(
2214 workflow: &YamlWorkflow,
2215 workflow_input: &Value,
2216 executor: &dyn YamlWorkflowLlmExecutor,
2217 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2218 options: &YamlWorkflowRunOptions,
2219) -> Result<Option<YamlWorkflowRunOutput>, YamlWorkflowRunError> {
2220 let ir = match yaml_workflow_to_ir(workflow) {
2221 Ok(def) => def,
2222 Err(YamlToIrError::UnsupportedNode { .. })
2223 | Err(YamlToIrError::MultipleOutgoingEdge { .. }) => return Ok(None),
2224 Err(err) => {
2225 return Err(YamlWorkflowRunError::InvalidInput {
2226 message: err.to_string(),
2227 });
2228 }
2229 };
2230
2231 if validate_and_normalize(&ir).is_err() {
2232 return Ok(None);
2233 }
2234
2235 let parent_trace_context = trace_context_from_options(options);
2236 let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
2237
2238 let tracer = workflow_tracer();
2239 let mut workflow_span_context: Option<TraceContext> = None;
2240 let mut workflow_span = if telemetry_context.sampled {
2241 let (span_context, mut span) = tracer.start_span(
2242 "workflow.run",
2243 SpanKind::Workflow,
2244 parent_trace_context.as_ref(),
2245 );
2246 apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
2247 apply_trace_tenant_attributes(span.as_mut(), options);
2248 workflow_span_context = Some(span_context);
2249 Some(span)
2250 } else {
2251 None
2252 };
2253
2254 struct NoopLlm;
2255 #[async_trait]
2256 impl LlmExecutor for NoopLlm {
2257 async fn execute(
2258 &self,
2259 _input: LlmExecutionInput,
2260 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
2261 Err(LlmExecutionError::UnexpectedOutcome(
2262 "yaml_ir_uses_tool_path",
2263 ))
2264 }
2265 }
2266
2267 struct YamlIrToolExecutor<'a> {
2268 llm_executor: &'a dyn YamlWorkflowLlmExecutor,
2269 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
2270 token_totals: std::sync::Mutex<YamlTokenTotals>,
2271 node_usage: std::sync::Mutex<BTreeMap<String, YamlLlmTokenUsage>>,
2272 node_models: std::sync::Mutex<BTreeMap<String, String>>,
2273 model_override: Option<String>,
2274 trace_id: Option<String>,
2275 trace_context: Option<TraceContext>,
2276 trace_input_context: Option<YamlWorkflowTraceContextInput>,
2277 tenant_context: YamlWorkflowTraceTenantContext,
2278 payload_mode: YamlWorkflowPayloadMode,
2279 trace_sampled: bool,
2280 }
2281
2282 #[async_trait]
2283 impl ToolExecutor for YamlIrToolExecutor<'_> {
2284 async fn execute_tool(
2285 &self,
2286 input: ToolExecutionInput,
2287 ) -> Result<Value, ToolExecutionError> {
2288 let context = build_yaml_context_from_ir_scope(&input.scoped_input);
2289
2290 if input.tool == YAML_LLM_TOOL_ID {
2291 let node_id = input
2292 .input
2293 .get("node_id")
2294 .and_then(Value::as_str)
2295 .ok_or_else(|| {
2296 ToolExecutionError::Failed("yaml llm call missing node_id".to_string())
2297 })?
2298 .to_string();
2299 let node_id_for_metrics = node_id.clone();
2300 let model = input
2301 .input
2302 .get("model")
2303 .and_then(Value::as_str)
2304 .ok_or_else(|| {
2305 ToolExecutionError::Failed("yaml llm call missing model".to_string())
2306 })?
2307 .to_string();
2308 let resolved_model =
2309 resolve_requested_model(self.model_override.as_deref(), &model);
2310 let prompt_template = input
2311 .input
2312 .get("prompt_template")
2313 .and_then(Value::as_str)
2314 .unwrap_or_default()
2315 .to_string();
2316 let stream = input
2317 .input
2318 .get("stream")
2319 .and_then(Value::as_bool)
2320 .unwrap_or(false);
2321 let max_tokens = input
2322 .input
2323 .get("max_tokens")
2324 .and_then(Value::as_u64)
2325 .and_then(|value| u32::try_from(value).ok());
2326 let temperature = input
2327 .input
2328 .get("temperature")
2329 .and_then(Value::as_f64)
2330 .map(|value| value as f32);
2331 let top_p = input
2332 .input
2333 .get("top_p")
2334 .and_then(Value::as_f64)
2335 .map(|value| value as f32);
2336 let heal = input
2337 .input
2338 .get("heal")
2339 .and_then(Value::as_bool)
2340 .unwrap_or(false);
2341 let append_prompt_as_user = input
2342 .input
2343 .get("append_prompt_as_user")
2344 .and_then(Value::as_bool)
2345 .unwrap_or(true);
2346 let messages_path = input
2347 .input
2348 .get("messages_path")
2349 .and_then(Value::as_str)
2350 .map(str::to_string);
2351
2352 let messages = if let Some(path) = messages_path.as_deref() {
2353 Some(
2354 parse_messages_from_context(path, &context)
2355 .map_err(ToolExecutionError::Failed)?,
2356 )
2357 } else {
2358 None
2359 };
2360
2361 let prompt_bindings = collect_template_bindings(&prompt_template, &context);
2362 let prompt = interpolate_template(&prompt_template, &context);
2363 let email_text = context
2364 .get("input")
2365 .and_then(|v| v.get("email_text"))
2366 .and_then(Value::as_str)
2367 .unwrap_or_default();
2368 let schema = input
2369 .input
2370 .get("output_schema")
2371 .cloned()
2372 .unwrap_or_else(default_llm_output_schema);
2373
2374 let request = YamlLlmExecutionRequest {
2375 node_id,
2376 is_terminal_node: false,
2377 stream_json_as_text: input
2378 .input
2379 .get("stream_json_as_text")
2380 .and_then(Value::as_bool)
2381 .unwrap_or(false),
2382 model: resolved_model.clone(),
2383 max_tokens,
2384 temperature,
2385 top_p,
2386 messages,
2387 append_prompt_as_user,
2388 prompt,
2389 prompt_template,
2390 prompt_bindings,
2391 schema,
2392 stream,
2393 heal,
2394 tools: Vec::new(),
2395 tool_choice: None,
2396 max_tool_roundtrips: 1,
2397 tool_calls_global_key: None,
2398 tool_trace_mode: YamlToolTraceMode::Off,
2399 execution_context: context.clone(),
2400 email_text: email_text.to_string(),
2401 trace_id: self.trace_id.clone(),
2402 trace_context: self.trace_context.clone(),
2403 tenant_context: self.tenant_context.clone(),
2404 trace_sampled: self.trace_sampled,
2405 };
2406
2407 let llm_result = self
2408 .llm_executor
2409 .complete_structured(request, None)
2410 .await
2411 .map_err(ToolExecutionError::Failed);
2412
2413 if let Ok(ref result) = llm_result {
2414 if let Some(usage) = result.usage.as_ref() {
2415 if let Ok(mut totals) = self.token_totals.lock() {
2416 totals.add_usage(usage);
2417 }
2418 if let Ok(mut usage_map) = self.node_usage.lock() {
2419 usage_map.insert(node_id_for_metrics.clone(), usage.clone());
2420 }
2421 }
2422 if let Ok(mut model_map) = self.node_models.lock() {
2423 model_map.insert(node_id_for_metrics, resolved_model);
2424 }
2425 }
2426
2427 return llm_result.map(|result| result.payload);
2428 }
2429
2430 let worker = self
2431 .custom_worker
2432 .ok_or_else(|| ToolExecutionError::NotFound {
2433 tool: input.tool.clone(),
2434 })?;
2435
2436 let mut payload = input.input.clone();
2437 let handler_file = payload
2438 .as_object_mut()
2439 .and_then(|obj| obj.remove("__handler_file"))
2440 .and_then(|value| value.as_str().map(ToString::to_string));
2441 let email_text = context
2442 .get("input")
2443 .and_then(|v| v.get("email_text"))
2444 .and_then(Value::as_str)
2445 .unwrap_or_default();
2446
2447 let tracer = workflow_tracer();
2448 let mut handler_span_context: Option<TraceContext> = None;
2449 let mut handler_span = if self.trace_sampled {
2450 let (span_context, mut span) = tracer.start_span(
2451 "handler.invoke",
2452 SpanKind::Node,
2453 self.trace_context.as_ref(),
2454 );
2455 handler_span_context = Some(span_context);
2456 apply_trace_identity_attributes(span.as_mut(), self.trace_id.as_deref());
2457 span.set_attribute("handler_name", input.tool.as_str());
2458 apply_trace_tenant_attributes_from_tenant(span.as_mut(), &self.tenant_context);
2459 span.set_attribute(
2460 "node_input",
2461 payload_for_span(self.payload_mode, &payload).as_str(),
2462 );
2463 Some(span)
2464 } else {
2465 None
2466 };
2467
2468 let trace_options = YamlWorkflowRunOptions {
2469 telemetry: YamlWorkflowTelemetryConfig::default(),
2470 trace: YamlWorkflowTraceOptions {
2471 context: self.trace_input_context.clone(),
2472 tenant: self.tenant_context.clone(),
2473 },
2474 model: None,
2475 };
2476 let worker_trace_context = merged_trace_context_for_worker(
2477 handler_span_context.as_ref(),
2478 self.trace_id.as_deref(),
2479 &trace_options,
2480 );
2481 let worker_context = custom_worker_context_with_trace(
2482 &context,
2483 &worker_trace_context,
2484 &self.tenant_context,
2485 );
2486
2487 let output_result = worker
2488 .execute(
2489 &input.tool,
2490 handler_file.as_deref(),
2491 &payload,
2492 email_text,
2493 &worker_context,
2494 )
2495 .await
2496 .map_err(ToolExecutionError::Failed);
2497
2498 if let Some(span) = handler_span.as_mut() {
2499 if output_result.is_ok() {
2500 span.add_event("handler.success");
2501 } else {
2502 span.add_event("handler.error");
2503 }
2504 }
2505
2506 if let Some(span) = handler_span.take() {
2507 span.end();
2508 }
2509
2510 output_result
2511 }
2512 }
2513
2514 let tool_executor = YamlIrToolExecutor {
2515 llm_executor: executor,
2516 custom_worker,
2517 token_totals: std::sync::Mutex::new(YamlTokenTotals::default()),
2518 node_usage: std::sync::Mutex::new(BTreeMap::new()),
2519 node_models: std::sync::Mutex::new(BTreeMap::new()),
2520 model_override: options.model.clone(),
2521 trace_id: telemetry_context.trace_id.clone(),
2522 trace_context: workflow_span_context.clone(),
2523 trace_input_context: options.trace.context.clone(),
2524 tenant_context: options.trace.tenant.clone(),
2525 payload_mode: options.telemetry.payload_mode,
2526 trace_sampled: telemetry_context.sampled,
2527 };
2528
2529 let runtime_options = WorkflowRuntimeOptions {
2530 validate_before_run: false,
2531 ..WorkflowRuntimeOptions::default()
2532 };
2533 let runtime = WorkflowRuntime::new(ir, &NoopLlm, Some(&tool_executor), runtime_options);
2534
2535 let started = Instant::now();
2536 let result = match runtime.execute(workflow_input.clone(), None).await {
2537 Ok(result) => result,
2538 Err(WorkflowRuntimeError::Validation(_)) => return Ok(None),
2539 Err(error) => {
2540 return Err(YamlWorkflowRunError::IrRuntime {
2541 message: error.to_string(),
2542 });
2543 }
2544 };
2545 let total_elapsed_ms = started.elapsed().as_millis();
2546
2547 let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
2548 for (node_id, output) in result.node_outputs {
2549 if node_id == YAML_START_NODE_ID {
2550 continue;
2551 }
2552 outputs.insert(node_id, json!({"output": output}));
2553 }
2554
2555 let mut trace = Vec::new();
2556 let mut step_timings = Vec::new();
2557 let node_usage_map = tool_executor
2558 .node_usage
2559 .lock()
2560 .map(|usage| usage.clone())
2561 .unwrap_or_default();
2562 let llm_node_models = tool_executor
2563 .node_models
2564 .lock()
2565 .map(|models| models.clone())
2566 .unwrap_or_default();
2567 let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
2568 for execution in result.node_executions {
2569 if execution.node_id == YAML_START_NODE_ID {
2570 continue;
2571 }
2572 let node_id = execution.node_id;
2573 trace.push(node_id.clone());
2574 let usage = node_usage_map.get(&node_id);
2575 if let Some(usage) = usage {
2576 llm_node_metrics.insert(
2577 node_id.clone(),
2578 YamlLlmNodeMetrics {
2579 elapsed_ms: 0,
2580 prompt_tokens: usage.prompt_tokens,
2581 completion_tokens: usage.completion_tokens,
2582 total_tokens: usage.total_tokens,
2583 reasoning_tokens: usage.reasoning_tokens,
2584 tokens_per_second: completion_tokens_per_second(usage.completion_tokens, 0),
2585 },
2586 );
2587 }
2588 step_timings.push(YamlStepTiming {
2589 node_id: node_id.clone(),
2590 node_kind: "ir_runtime".to_string(),
2591 model_name: llm_node_models.get(&node_id).cloned(),
2592 elapsed_ms: 0,
2593 prompt_tokens: usage.map(|value| value.prompt_tokens),
2594 completion_tokens: usage.map(|value| value.completion_tokens),
2595 total_tokens: usage.map(|value| value.total_tokens),
2596 reasoning_tokens: usage.and_then(|value| value.reasoning_tokens),
2597 tokens_per_second: usage
2598 .map(|value| completion_tokens_per_second(value.completion_tokens, 0)),
2599 });
2600 }
2601
2602 let terminal_node = result.terminal_node_id;
2603 let terminal_output = outputs
2604 .get(&terminal_node)
2605 .and_then(|v| v.get("output"))
2606 .cloned();
2607
2608 let email_text = workflow_input
2609 .get("email_text")
2610 .and_then(Value::as_str)
2611 .unwrap_or_default()
2612 .to_string();
2613
2614 let token_totals = tool_executor
2615 .token_totals
2616 .lock()
2617 .map(|totals| totals.clone())
2618 .unwrap_or_default();
2619
2620 let output = YamlWorkflowRunOutput {
2621 workflow_id: workflow.id.clone(),
2622 entry_node: workflow.entry_node.clone(),
2623 email_text,
2624 trace,
2625 outputs,
2626 terminal_node,
2627 terminal_output,
2628 step_timings,
2629 llm_node_metrics,
2630 llm_node_models,
2631 total_elapsed_ms,
2632 ttft_ms: None,
2633 total_input_tokens: token_totals.input_tokens,
2634 total_output_tokens: token_totals.output_tokens,
2635 total_tokens: token_totals.total_tokens,
2636 total_reasoning_tokens: token_totals.reasoning_tokens,
2637 tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
2638 trace_id: telemetry_context.trace_id.clone(),
2639 metadata: telemetry_context.trace_id.as_ref().map(|value| {
2640 workflow_metadata_with_trace(
2641 options,
2642 value,
2643 telemetry_context.sampled,
2644 telemetry_context.trace_id_source,
2645 )
2646 }),
2647 };
2648
2649 if let Some(mut span) = workflow_span.take() {
2650 span.set_attribute("workflow_id", workflow.id.as_str());
2651 apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
2652 apply_langfuse_trace_input_output_attributes(
2653 span.as_mut(),
2654 workflow_input,
2655 &output,
2656 options.telemetry.payload_mode,
2657 );
2658 apply_langfuse_nerdstats_attributes(span.as_mut(), &output, options.telemetry.nerdstats);
2659 span.end();
2660 flush_workflow_tracer();
2661 }
2662
2663 Ok(Some(output))
2664}
2665
2666async fn execute_subworkflow_tool_call(
2667 payload: &Value,
2668 context: &Value,
2669 client: &SimpleAgentsClient,
2670 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2671 parent_options: &YamlWorkflowRunOptions,
2672 parent_trace_context: Option<&TraceContext>,
2673 resolved_trace_id: Option<&str>,
2674) -> Result<Value, String> {
2675 let workflow_id = payload
2676 .get("workflow_id")
2677 .and_then(Value::as_str)
2678 .ok_or_else(|| "run_workflow_graph requires payload.workflow_id".to_string())?;
2679
2680 let input_context = context
2681 .get("input")
2682 .and_then(Value::as_object)
2683 .ok_or_else(|| "run_workflow_graph requires context.input".to_string())?;
2684
2685 let registry = input_context
2686 .get("workflow_registry")
2687 .and_then(Value::as_object)
2688 .ok_or_else(|| {
2689 "run_workflow_graph requires input.workflow_registry map of workflow_id -> yaml_path"
2690 .to_string()
2691 })?;
2692
2693 let workflow_path = registry
2694 .get(workflow_id)
2695 .and_then(Value::as_str)
2696 .ok_or_else(|| {
2697 format!(
2698 "workflow_registry has no entry for workflow_id '{}'",
2699 workflow_id
2700 )
2701 })?;
2702
2703 let parent_depth = input_context
2704 .get("__subgraph_depth")
2705 .and_then(Value::as_u64)
2706 .unwrap_or(0);
2707 let max_depth = input_context
2708 .get("__subgraph_max_depth")
2709 .and_then(Value::as_u64)
2710 .unwrap_or(3);
2711
2712 if parent_depth >= max_depth {
2713 return Err(format!(
2714 "run_workflow_graph depth limit reached (depth={}, max={})",
2715 parent_depth, max_depth
2716 ));
2717 }
2718
2719 let mut subworkflow_input = payload
2720 .get("input")
2721 .and_then(Value::as_object)
2722 .cloned()
2723 .unwrap_or_default();
2724
2725 if !subworkflow_input.contains_key("messages") {
2726 if let Some(messages) = input_context.get("messages") {
2727 subworkflow_input.insert("messages".to_string(), messages.clone());
2728 }
2729 }
2730
2731 if !subworkflow_input.contains_key("email_text") {
2732 if let Some(email_text) = input_context.get("email_text") {
2733 subworkflow_input.insert("email_text".to_string(), email_text.clone());
2734 }
2735 }
2736
2737 if !subworkflow_input.contains_key("workflow_registry") {
2738 subworkflow_input.insert(
2739 "workflow_registry".to_string(),
2740 Value::Object(registry.clone()),
2741 );
2742 }
2743
2744 subworkflow_input.insert(
2745 "__subgraph_depth".to_string(),
2746 Value::Number(serde_json::Number::from(parent_depth + 1)),
2747 );
2748 subworkflow_input.insert(
2749 "__subgraph_max_depth".to_string(),
2750 Value::Number(serde_json::Number::from(max_depth)),
2751 );
2752
2753 let subworkflow_options =
2754 build_subworkflow_options(parent_options, parent_trace_context, resolved_trace_id);
2755
2756 let output = run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
2757 Path::new(workflow_path),
2758 &Value::Object(subworkflow_input),
2759 client,
2760 custom_worker,
2761 None,
2762 &subworkflow_options,
2763 )
2764 .await
2765 .map_err(|error| format!("subworkflow '{}' failed: {}", workflow_id, error))?;
2766
2767 Ok(json!({
2768 "workflow_id": workflow_id,
2769 "workflow_path": workflow_path,
2770 "terminal_node": output.terminal_node,
2771 "terminal_output": output.terminal_output,
2772 "trace": output.trace,
2773 }))
2774}
2775
2776fn build_subworkflow_options(
2777 parent_options: &YamlWorkflowRunOptions,
2778 parent_trace_context: Option<&TraceContext>,
2779 resolved_trace_id: Option<&str>,
2780) -> YamlWorkflowRunOptions {
2781 let mut subworkflow_options = parent_options.clone();
2782 if parent_trace_context.is_some() || resolved_trace_id.is_some() {
2783 let trace_context = YamlWorkflowTraceContextInput {
2784 trace_id: resolved_trace_id
2785 .map(|value| value.to_string())
2786 .or_else(|| parent_trace_context.and_then(|ctx| ctx.trace_id.clone())),
2787 span_id: parent_trace_context.and_then(|ctx| ctx.span_id.clone()),
2788 parent_span_id: parent_trace_context.and_then(|ctx| ctx.parent_span_id.clone()),
2789 traceparent: parent_trace_context.and_then(|ctx| ctx.traceparent.clone()),
2790 tracestate: parent_trace_context.and_then(|ctx| ctx.tracestate.clone()),
2791 baggage: parent_trace_context
2792 .map(|ctx| ctx.baggage.clone())
2793 .unwrap_or_default(),
2794 };
2795 subworkflow_options.trace.context = Some(trace_context);
2796 }
2797 subworkflow_options
2798}
2799
2800#[derive(Debug, Clone, Deserialize)]
2801pub struct YamlWorkflow {
2802 pub id: String,
2803 pub entry_node: String,
2804 #[serde(default)]
2805 pub nodes: Vec<YamlNode>,
2806 #[serde(default)]
2807 pub edges: Vec<YamlEdge>,
2808}
2809
2810#[derive(Debug, Clone, Deserialize)]
2811pub struct YamlNode {
2812 pub id: String,
2813 pub node_type: YamlNodeType,
2814 pub config: Option<YamlNodeConfig>,
2815}
2816
2817impl YamlNode {
2818 fn kind_name(&self) -> &'static str {
2819 if self.node_type.llm_call.is_some() {
2820 "llm_call"
2821 } else if self.node_type.switch.is_some() {
2822 "switch"
2823 } else if self.node_type.custom_worker.is_some() {
2824 "custom_worker"
2825 } else if self.node_type.end.is_some() {
2826 "end"
2827 } else {
2828 "unknown"
2829 }
2830 }
2831}
2832
2833#[derive(Debug, Clone, Deserialize)]
2834#[serde(deny_unknown_fields)]
2835pub struct YamlNodeType {
2836 pub llm_call: Option<YamlLlmCall>,
2837 pub switch: Option<YamlSwitch>,
2838 pub custom_worker: Option<YamlCustomWorker>,
2839 pub end: Option<Value>,
2840}
2841
2842#[derive(Debug, Clone, Deserialize)]
2843#[serde(deny_unknown_fields)]
2844pub struct YamlLlmCall {
2845 pub model: String,
2846 pub max_tokens: Option<u32>,
2847 pub temperature: Option<f32>,
2848 pub top_p: Option<f32>,
2849 pub stream: Option<bool>,
2850 pub stream_json_as_text: Option<bool>,
2851 pub heal: Option<bool>,
2852 pub messages_path: Option<String>,
2853 pub append_prompt_as_user: Option<bool>,
2854 #[serde(default)]
2855 pub tools_format: YamlToolFormat,
2856 #[serde(default)]
2857 pub tools: Vec<YamlToolDeclaration>,
2858 pub tool_choice: Option<YamlToolChoiceConfig>,
2859 pub max_tool_roundtrips: Option<u8>,
2860 pub tool_calls_global_key: Option<String>,
2861}
2862
2863#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
2864#[serde(rename_all = "snake_case")]
2865pub enum YamlToolFormat {
2866 #[default]
2867 Openai,
2868 Simplified,
2869}
2870
2871#[derive(Debug, Clone, Deserialize)]
2872#[serde(untagged)]
2873pub enum YamlToolDeclaration {
2874 OpenAi(YamlOpenAiToolDeclaration),
2875 Simplified(YamlSimplifiedToolDeclaration),
2876}
2877
2878#[derive(Debug, Clone, Deserialize)]
2879pub struct YamlOpenAiToolDeclaration {
2880 #[serde(rename = "type")]
2881 pub tool_type: Option<ToolType>,
2882 pub function: YamlOpenAiToolFunction,
2883}
2884
2885#[derive(Debug, Clone, Deserialize)]
2886pub struct YamlOpenAiToolFunction {
2887 pub name: String,
2888 pub description: Option<String>,
2889 pub parameters: Option<Value>,
2890 pub output_schema: Option<Value>,
2891}
2892
2893#[derive(Debug, Clone, Deserialize)]
2894pub struct YamlSimplifiedToolDeclaration {
2895 pub name: String,
2896 pub description: Option<String>,
2897 pub input_schema: Value,
2898 pub output_schema: Option<Value>,
2899}
2900
2901#[derive(Debug, Clone, Deserialize)]
2902#[serde(untagged)]
2903pub enum YamlToolChoiceConfig {
2904 Mode(ToolChoiceMode),
2905 Function { function: String },
2906 OpenAi(ToolChoiceTool),
2907}
2908
2909#[derive(Debug, Clone, Deserialize)]
2910pub struct YamlSwitch {
2911 #[serde(default)]
2912 pub branches: Vec<YamlSwitchBranch>,
2913 pub default: String,
2914}
2915
2916#[derive(Debug, Clone, Deserialize)]
2917pub struct YamlSwitchBranch {
2918 pub condition: String,
2919 pub target: String,
2920}
2921
2922#[derive(Debug, Clone, Deserialize)]
2923#[serde(deny_unknown_fields)]
2924pub struct YamlCustomWorker {
2925 pub handler: String,
2926 pub handler_file: Option<String>,
2927}
2928
2929#[derive(Debug, Clone, Deserialize)]
2930pub struct YamlNodeConfig {
2931 pub prompt: Option<String>,
2932 #[serde(default, alias = "schema")]
2933 pub output_schema: Option<Value>,
2934 pub payload: Option<Value>,
2935 pub set_globals: Option<HashMap<String, String>>,
2936 pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
2937}
2938
2939#[derive(Debug, Clone, Deserialize)]
2940pub struct YamlGlobalUpdate {
2941 pub op: String,
2942 pub from: Option<String>,
2943 pub by: Option<f64>,
2944}
2945
2946#[derive(Debug, Clone, Deserialize)]
2947pub struct YamlEdge {
2948 pub from: String,
2949 pub to: String,
2950}
2951
2952#[cfg(test)]
2953mod tests {
2954 use super::*;
2955 use simple_agent_type::provider::{Provider, ProviderRequest, ProviderResponse};
2956 use simple_agent_type::response::{CompletionChoice, CompletionResponse, Usage};
2957 use simple_agent_type::tool::{ToolCallFunction, ToolType};
2958 use simple_agent_type::{Result as SaResult, SimpleAgentsError};
2959 use simple_agents_core::SimpleAgentsClientBuilder;
2960 use std::collections::BTreeMap;
2961 use std::fs;
2962 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2963 use std::sync::{Arc, Mutex, OnceLock};
2964
2965 fn stream_debug_env_lock() -> &'static Mutex<()> {
2966 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
2967 LOCK.get_or_init(|| Mutex::new(()))
2968 }
2969
2970 struct MockExecutor;
2971
2972 struct RecordingSink {
2973 events: Mutex<Vec<YamlWorkflowEvent>>,
2974 }
2975
2976 struct CancelAfterFirstEventSink {
2977 cancelled: AtomicBool,
2978 }
2979
2980 impl YamlWorkflowEventSink for CancelAfterFirstEventSink {
2981 fn emit(&self, _event: &YamlWorkflowEvent) {
2982 self.cancelled.store(true, Ordering::SeqCst);
2983 }
2984
2985 fn is_cancelled(&self) -> bool {
2986 self.cancelled.load(Ordering::SeqCst)
2987 }
2988 }
2989
2990 struct CountingExecutor {
2991 call_count: AtomicUsize,
2992 }
2993
2994 struct CapturingWorker {
2995 context: Mutex<Option<Value>>,
2996 }
2997
2998 struct HandlerFileCapturingWorker {
2999 handler_file: Mutex<Option<String>>,
3000 }
3001
3002 struct ToolLoopProvider;
3003
3004 struct UnknownToolProvider;
3005
3006 struct ReasoningUsageProvider;
3007
3008 struct ToolLoopReasoningProvider;
3009
3010 #[derive(Default)]
3011 struct CapturingSpan {
3012 attributes: BTreeMap<String, String>,
3013 }
3014
3015 impl WorkflowSpan for CapturingSpan {
3016 fn set_attribute(&mut self, key: &str, value: &str) {
3017 self.attributes.insert(key.to_string(), value.to_string());
3018 }
3019
3020 fn add_event(&mut self, _name: &str) {}
3021
3022 fn end(self: Box<Self>) {}
3023 }
3024
3025 #[async_trait]
3026 impl Provider for ToolLoopProvider {
3027 fn name(&self) -> &str {
3028 "openai"
3029 }
3030
3031 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
3032 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
3033 Ok(ProviderRequest::new("mock://tool-loop").with_body(body))
3034 }
3035
3036 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
3037 let request: CompletionRequest =
3038 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
3039
3040 let has_tools = request
3041 .tools
3042 .as_ref()
3043 .is_some_and(|tools| !tools.is_empty());
3044 let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
3045
3046 let response = if has_tools && !has_tool_result {
3047 CompletionResponse {
3048 id: "resp_tool_1".to_string(),
3049 model: request.model.clone(),
3050 choices: vec![CompletionChoice {
3051 index: 0,
3052 message: Message::assistant("").with_tool_calls(vec![ToolCall {
3053 id: "call_get_context".to_string(),
3054 tool_type: ToolType::Function,
3055 function: ToolCallFunction {
3056 name: "get_customer_context".to_string(),
3057 arguments: "{\"order_id\":\"123\"}".to_string(),
3058 },
3059 }]),
3060 finish_reason: FinishReason::ToolCalls,
3061 logprobs: None,
3062 }],
3063 usage: Usage::new(10, 5),
3064 created: None,
3065 provider: Some(self.name().to_string()),
3066 healing_metadata: None,
3067 }
3068 } else if has_tools && has_tool_result {
3069 CompletionResponse {
3070 id: "resp_tool_2".to_string(),
3071 model: request.model.clone(),
3072 choices: vec![CompletionChoice {
3073 index: 0,
3074 message: Message::assistant("{\"state\":\"done\"}"),
3075 finish_reason: FinishReason::Stop,
3076 logprobs: None,
3077 }],
3078 usage: Usage::new(12, 6),
3079 created: None,
3080 provider: Some(self.name().to_string()),
3081 healing_metadata: None,
3082 }
3083 } else {
3084 let prompt = request
3085 .messages
3086 .iter()
3087 .rev()
3088 .find(|m| m.role == Role::User)
3089 .map(|m| m.content.clone())
3090 .unwrap_or_default();
3091 let payload = json!({
3092 "subject": "ok",
3093 "body": prompt,
3094 })
3095 .to_string();
3096 CompletionResponse {
3097 id: "resp_final".to_string(),
3098 model: request.model.clone(),
3099 choices: vec![CompletionChoice {
3100 index: 0,
3101 message: Message::assistant(payload),
3102 finish_reason: FinishReason::Stop,
3103 logprobs: None,
3104 }],
3105 usage: Usage::new(8, 4),
3106 created: None,
3107 provider: Some(self.name().to_string()),
3108 healing_metadata: None,
3109 }
3110 };
3111
3112 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
3113 Ok(ProviderResponse::new(200, body))
3114 }
3115
3116 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
3117 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
3118 }
3119 }
3120
3121 #[async_trait]
3122 impl Provider for UnknownToolProvider {
3123 fn name(&self) -> &str {
3124 "openai"
3125 }
3126
3127 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
3128 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
3129 Ok(ProviderRequest::new("mock://unknown-tool").with_body(body))
3130 }
3131
3132 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
3133 let request: CompletionRequest =
3134 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
3135
3136 let response = CompletionResponse {
3137 id: "resp_unknown_tool".to_string(),
3138 model: request.model,
3139 choices: vec![CompletionChoice {
3140 index: 0,
3141 message: Message::assistant("").with_tool_calls(vec![ToolCall {
3142 id: "call_unknown".to_string(),
3143 tool_type: ToolType::Function,
3144 function: ToolCallFunction {
3145 name: "unknown_tool".to_string(),
3146 arguments: "{}".to_string(),
3147 },
3148 }]),
3149 finish_reason: FinishReason::ToolCalls,
3150 logprobs: None,
3151 }],
3152 usage: Usage::new(5, 2),
3153 created: None,
3154 provider: Some(self.name().to_string()),
3155 healing_metadata: None,
3156 };
3157
3158 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
3159 Ok(ProviderResponse::new(200, body))
3160 }
3161
3162 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
3163 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
3164 }
3165 }
3166
3167 #[async_trait]
3168 impl Provider for ReasoningUsageProvider {
3169 fn name(&self) -> &str {
3170 "openai"
3171 }
3172
3173 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
3174 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
3175 Ok(ProviderRequest::new("mock://reasoning-usage").with_body(body))
3176 }
3177
3178 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
3179 let request: CompletionRequest =
3180 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
3181
3182 let mut usage = Usage::new(9, 5);
3183 usage.reasoning_tokens = Some(4);
3184 let response = CompletionResponse {
3185 id: "resp_reasoning".to_string(),
3186 model: request.model,
3187 choices: vec![CompletionChoice {
3188 index: 0,
3189 message: Message::assistant("{\"state\":\"ok\"}"),
3190 finish_reason: FinishReason::Stop,
3191 logprobs: None,
3192 }],
3193 usage,
3194 created: None,
3195 provider: Some(self.name().to_string()),
3196 healing_metadata: None,
3197 };
3198
3199 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
3200 Ok(ProviderResponse::new(200, body))
3201 }
3202
3203 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
3204 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
3205 }
3206 }
3207
3208 #[async_trait]
3209 impl Provider for ToolLoopReasoningProvider {
3210 fn name(&self) -> &str {
3211 "openai"
3212 }
3213
3214 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
3215 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
3216 Ok(ProviderRequest::new("mock://tool-loop-reasoning").with_body(body))
3217 }
3218
3219 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
3220 let request: CompletionRequest =
3221 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
3222
3223 let has_tools = request
3224 .tools
3225 .as_ref()
3226 .is_some_and(|tools| !tools.is_empty());
3227 let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
3228
3229 let response = if has_tools && !has_tool_result {
3230 let mut usage = Usage::new(10, 5);
3231 usage.reasoning_tokens = Some(2);
3232 CompletionResponse {
3233 id: "resp_tool_reasoning_1".to_string(),
3234 model: request.model.clone(),
3235 choices: vec![CompletionChoice {
3236 index: 0,
3237 message: Message::assistant("").with_tool_calls(vec![ToolCall {
3238 id: "call_get_context".to_string(),
3239 tool_type: ToolType::Function,
3240 function: ToolCallFunction {
3241 name: "get_customer_context".to_string(),
3242 arguments: "{\"order_id\":\"123\"}".to_string(),
3243 },
3244 }]),
3245 finish_reason: FinishReason::ToolCalls,
3246 logprobs: None,
3247 }],
3248 usage,
3249 created: None,
3250 provider: Some(self.name().to_string()),
3251 healing_metadata: None,
3252 }
3253 } else {
3254 let mut usage = Usage::new(12, 6);
3255 usage.reasoning_tokens = Some(3);
3256 CompletionResponse {
3257 id: "resp_tool_reasoning_2".to_string(),
3258 model: request.model,
3259 choices: vec![CompletionChoice {
3260 index: 0,
3261 message: Message::assistant("{\"state\":\"done\"}"),
3262 finish_reason: FinishReason::Stop,
3263 logprobs: None,
3264 }],
3265 usage,
3266 created: None,
3267 provider: Some(self.name().to_string()),
3268 healing_metadata: None,
3269 }
3270 };
3271
3272 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
3273 Ok(ProviderResponse::new(200, body))
3274 }
3275
3276 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
3277 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
3278 }
3279 }
3280
3281 struct FixedToolWorker {
3282 payload: Value,
3283 }
3284
3285 struct CountingToolWorker {
3286 execute_calls: AtomicUsize,
3287 }
3288
3289 #[async_trait]
3290 impl YamlWorkflowCustomWorkerExecutor for FixedToolWorker {
3291 async fn execute(
3292 &self,
3293 _handler: &str,
3294 _handler_file: Option<&str>,
3295 _payload: &Value,
3296 _email_text: &str,
3297 _context: &Value,
3298 ) -> Result<Value, String> {
3299 Ok(self.payload.clone())
3300 }
3301 }
3302
3303 #[async_trait]
3304 impl YamlWorkflowCustomWorkerExecutor for CountingToolWorker {
3305 async fn execute(
3306 &self,
3307 _handler: &str,
3308 _handler_file: Option<&str>,
3309 _payload: &Value,
3310 _email_text: &str,
3311 _context: &Value,
3312 ) -> Result<Value, String> {
3313 self.execute_calls.fetch_add(1, Ordering::SeqCst);
3314 Ok(json!({"ok": true}))
3315 }
3316 }
3317
3318 #[async_trait]
3319 impl YamlWorkflowLlmExecutor for CountingExecutor {
3320 async fn complete_structured(
3321 &self,
3322 _request: YamlLlmExecutionRequest,
3323 _event_sink: Option<&dyn YamlWorkflowEventSink>,
3324 ) -> Result<YamlLlmExecutionResult, String> {
3325 self.call_count.fetch_add(1, Ordering::SeqCst);
3326 Ok(YamlLlmExecutionResult {
3327 payload: json!({"state":"ok"}),
3328 usage: None,
3329 ttft_ms: None,
3330 tool_calls: Vec::new(),
3331 })
3332 }
3333 }
3334
3335 impl YamlWorkflowEventSink for RecordingSink {
3336 fn emit(&self, event: &YamlWorkflowEvent) {
3337 self.events
3338 .lock()
3339 .expect("recording sink lock should not be poisoned")
3340 .push(event.clone());
3341 }
3342 }
3343
3344 #[async_trait]
3345 impl YamlWorkflowCustomWorkerExecutor for CapturingWorker {
3346 async fn execute(
3347 &self,
3348 _handler: &str,
3349 _handler_file: Option<&str>,
3350 _payload: &Value,
3351 _email_text: &str,
3352 context: &Value,
3353 ) -> Result<Value, String> {
3354 let mut guard = self
3355 .context
3356 .lock()
3357 .map_err(|_| "capturing worker lock should not be poisoned".to_string())?;
3358 *guard = Some(context.clone());
3359 Ok(json!({"ok": true}))
3360 }
3361 }
3362
3363 #[async_trait]
3364 impl YamlWorkflowCustomWorkerExecutor for HandlerFileCapturingWorker {
3365 async fn execute(
3366 &self,
3367 _handler: &str,
3368 handler_file: Option<&str>,
3369 _payload: &Value,
3370 _email_text: &str,
3371 _context: &Value,
3372 ) -> Result<Value, String> {
3373 let mut guard = self
3374 .handler_file
3375 .lock()
3376 .map_err(|_| "handler-file lock should not be poisoned".to_string())?;
3377 *guard = handler_file.map(ToString::to_string);
3378 Ok(json!({"ok": true}))
3379 }
3380 }
3381
3382 #[async_trait]
3383 impl YamlWorkflowLlmExecutor for MockExecutor {
3384 async fn complete_structured(
3385 &self,
3386 request: YamlLlmExecutionRequest,
3387 _event_sink: Option<&dyn YamlWorkflowEventSink>,
3388 ) -> Result<YamlLlmExecutionResult, String> {
3389 let prompt = request.prompt;
3390 if prompt.contains("exactly one category") {
3391 return Ok(YamlLlmExecutionResult {
3392 payload: json!({"category":"termination","reason":"mock"}),
3393 usage: Some(YamlLlmTokenUsage {
3394 prompt_tokens: 10,
3395 completion_tokens: 5,
3396 total_tokens: 15,
3397 reasoning_tokens: None,
3398 }),
3399 ttft_ms: None,
3400 tool_calls: Vec::new(),
3401 });
3402 }
3403 if prompt.contains("Determine termination subtype") {
3404 return Ok(YamlLlmExecutionResult {
3405 payload: json!({"subtype":"repeated_offense","reason":"mock"}),
3406 usage: Some(YamlLlmTokenUsage {
3407 prompt_tokens: 12,
3408 completion_tokens: 6,
3409 total_tokens: 18,
3410 reasoning_tokens: None,
3411 }),
3412 ttft_ms: None,
3413 tool_calls: Vec::new(),
3414 });
3415 }
3416 if prompt.contains("Determine supply chain subtype") {
3417 return Ok(YamlLlmExecutionResult {
3418 payload: json!({"subtype":"order_replacement","reason":"mock"}),
3419 usage: Some(YamlLlmTokenUsage {
3420 prompt_tokens: 11,
3421 completion_tokens: 4,
3422 total_tokens: 15,
3423 reasoning_tokens: None,
3424 }),
3425 ttft_ms: None,
3426 tool_calls: Vec::new(),
3427 });
3428 }
3429 Err("unexpected prompt".to_string())
3430 }
3431 }
3432
3433 #[tokio::test]
3434 async fn runs_yaml_workflow_and_returns_step_timings() {
3435 let yaml = r#"
3436id: email-intake-classification
3437entry_node: classify_top_level
3438nodes:
3439 - id: classify_top_level
3440 node_type:
3441 llm_call:
3442 model: gpt-4.1
3443 config:
3444 prompt: |
3445 Classify this email into exactly one category:
3446 {{ input.email_text }}
3447 - id: route_top_level
3448 node_type:
3449 switch:
3450 branches:
3451 - condition: '$.nodes.classify_top_level.output.category == "termination"'
3452 target: classify_termination_subtype
3453 default: rag_clarification
3454 - id: classify_termination_subtype
3455 node_type:
3456 llm_call:
3457 model: gpt-4.1
3458 config:
3459 prompt: |
3460 Determine termination subtype:
3461 {{ input.email_text }}
3462 - id: route_termination_subtype
3463 node_type:
3464 switch:
3465 branches:
3466 - condition: '$.nodes.classify_termination_subtype.output.subtype == "repeated_offense"'
3467 target: rag_termination_repeated_offense
3468 default: rag_clarification
3469 - id: rag_termination_repeated_offense
3470 node_type:
3471 custom_worker:
3472 handler: GetRagData
3473 config:
3474 payload:
3475 topic: termination_repeated_offense
3476 - id: rag_clarification
3477 node_type:
3478 custom_worker:
3479 handler: GetRagData
3480 config:
3481 payload:
3482 topic: clarification
3483edges:
3484 - from: classify_top_level
3485 to: route_top_level
3486 - from: classify_termination_subtype
3487 to: route_termination_subtype
3488"#;
3489
3490 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3491 let custom_worker = FixedToolWorker {
3492 payload: json!({"context": "mock"}),
3493 };
3494 let output = run_email_workflow_yaml_with_custom_worker(
3495 &workflow,
3496 "test",
3497 &MockExecutor,
3498 Some(&custom_worker),
3499 )
3500 .await
3501 .expect("yaml workflow should execute");
3502
3503 assert_eq!(output.workflow_id, "email-intake-classification");
3504 assert_eq!(output.terminal_node, "rag_termination_repeated_offense");
3505 assert!(!output.step_timings.is_empty());
3506 assert_eq!(output.step_timings.len(), output.trace.len());
3507 assert!(output
3508 .outputs
3509 .contains_key("rag_termination_repeated_offense"));
3510 assert_eq!(output.total_input_tokens, 22);
3511 assert_eq!(output.total_output_tokens, 11);
3512 assert_eq!(output.total_tokens, 33);
3513 }
3514
3515 #[tokio::test]
3516 async fn workflow_runner_executes_inline_workflow_with_executor() {
3517 let yaml = r#"
3518id: email-intake-classification
3519entry_node: classify_top_level
3520nodes:
3521 - id: classify_top_level
3522 node_type:
3523 llm_call:
3524 model: gpt-4.1
3525 config:
3526 prompt: |
3527 Classify this email into exactly one category:
3528 {{ input.email_text }}
3529"#;
3530
3531 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3532 let output = WorkflowRunner::from_workflow(&workflow)
3533 .with_email_text("test")
3534 .with_executor(&MockExecutor)
3535 .run()
3536 .await
3537 .expect("builder execution should succeed");
3538
3539 assert_eq!(output.workflow_id, "email-intake-classification");
3540 assert!(output.outputs.contains_key("classify_top_level"));
3541 }
3542
3543 #[tokio::test]
3544 async fn workflow_runner_requires_executor() {
3545 let yaml = r#"
3546id: missing-executor
3547entry_node: end
3548nodes:
3549 - id: end
3550 node_type:
3551 end: {}
3552"#;
3553 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3554
3555 let err = WorkflowRunner::from_workflow(&workflow)
3556 .with_input(&json!({"email_text":"x"}))
3557 .run()
3558 .await
3559 .expect_err("missing executor should fail");
3560
3561 assert!(matches!(
3562 err,
3563 YamlWorkflowRunError::InvalidInput { message }
3564 if message.contains("workflow executor is required")
3565 ));
3566 }
3567
3568 #[tokio::test]
3569 async fn emits_resolved_llm_input_event_with_bindings() {
3570 let yaml = r#"
3571id: email-intake-classification
3572entry_node: classify_top_level
3573nodes:
3574 - id: classify_top_level
3575 node_type:
3576 llm_call:
3577 model: gpt-4.1
3578 config:
3579 prompt: |
3580 Classify this email into exactly one category:
3581 {{ input.email_text }}
3582"#;
3583
3584 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3585 let sink = RecordingSink {
3586 events: Mutex::new(Vec::new()),
3587 };
3588
3589 let output = run_email_workflow_yaml_with_custom_worker_and_events(
3590 &workflow,
3591 "Need help with termination",
3592 &MockExecutor,
3593 None,
3594 Some(&sink),
3595 )
3596 .await
3597 .expect("yaml workflow should execute");
3598
3599 assert_eq!(output.terminal_node, "classify_top_level");
3600
3601 let events = sink
3602 .events
3603 .lock()
3604 .expect("recording sink lock should not be poisoned");
3605 let llm_event = events
3606 .iter()
3607 .find(|event| event.event_type == "node_llm_input_resolved")
3608 .expect("expected llm input telemetry event");
3609
3610 let metadata = llm_event
3611 .metadata
3612 .as_ref()
3613 .expect("llm input event must include metadata");
3614 assert_eq!(metadata["model"], Value::String("gpt-4.1".to_string()));
3615 assert_eq!(metadata["stream_requested"], Value::Bool(false));
3616 assert_eq!(metadata["heal_requested"], Value::Bool(false));
3617 assert!(metadata["prompt"]
3618 .as_str()
3619 .expect("prompt should be a string")
3620 .contains("Need help with termination"));
3621
3622 let bindings = metadata["bindings"]
3623 .as_array()
3624 .expect("bindings should be an array");
3625 assert_eq!(bindings.len(), 1);
3626 assert_eq!(
3627 bindings[0]["source_path"],
3628 Value::String("input.email_text".to_string())
3629 );
3630 assert_eq!(
3631 bindings[0]["resolved"],
3632 Value::String("Need help with termination".to_string())
3633 );
3634 assert_eq!(bindings[0]["missing"], Value::Bool(false));
3635 assert_eq!(
3636 bindings[0]["resolved_type"],
3637 Value::String("string".to_string())
3638 );
3639 }
3640
3641 #[tokio::test]
3642 async fn workflow_completed_event_includes_nerdstats_by_default() {
3643 let yaml = r#"
3644id: nerdstats-default
3645entry_node: classify
3646nodes:
3647 - id: classify
3648 node_type:
3649 llm_call:
3650 model: gpt-4.1
3651 config:
3652 prompt: |
3653 Classify this email into exactly one category:
3654 {{ input.email_text }}
3655"#;
3656
3657 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3658 let sink = RecordingSink {
3659 events: Mutex::new(Vec::new()),
3660 };
3661
3662 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
3663 &workflow,
3664 &json!({"email_text":"hello"}),
3665 &MockExecutor,
3666 None,
3667 Some(&sink),
3668 &YamlWorkflowRunOptions::default(),
3669 )
3670 .await
3671 .expect("workflow should execute");
3672
3673 let events = sink
3674 .events
3675 .lock()
3676 .expect("recording sink lock should not be poisoned");
3677 let completed = events
3678 .iter()
3679 .find(|event| event.event_type == "workflow_completed")
3680 .expect("expected workflow_completed event");
3681 let metadata = completed
3682 .metadata
3683 .as_ref()
3684 .expect("workflow_completed should include metadata by default");
3685 let nerdstats = metadata
3686 .get("nerdstats")
3687 .expect("nerdstats should be present by default");
3688
3689 assert_eq!(nerdstats["workflow_id"], Value::String(output.workflow_id));
3690 assert_eq!(
3691 nerdstats["terminal_node"],
3692 Value::String(output.terminal_node)
3693 );
3694 assert_eq!(
3695 nerdstats["total_tokens"],
3696 Value::Number(output.total_tokens.into())
3697 );
3698 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
3699 assert_eq!(
3700 nerdstats["token_metrics_source"],
3701 Value::String("provider_usage".to_string())
3702 );
3703 assert!(nerdstats.get("step_timings").is_none());
3704 assert!(nerdstats.get("llm_node_metrics").is_none());
3705 assert!(nerdstats.get("step_details").is_some());
3706 assert!(nerdstats.get("llm_node_models").is_none());
3707 assert_eq!(
3708 nerdstats["step_details"][0]["model_name"],
3709 Value::String("gpt-4.1".to_string())
3710 );
3711 assert_eq!(
3712 nerdstats["step_details"][0]["node_id"],
3713 Value::String("classify".to_string())
3714 );
3715 assert_eq!(nerdstats["ttft_ms"], Value::Null);
3716 }
3717
3718 #[tokio::test]
3719 async fn workflow_completed_event_omits_nerdstats_when_disabled() {
3720 let yaml = r#"
3721id: nerdstats-disabled
3722entry_node: classify
3723nodes:
3724 - id: classify
3725 node_type:
3726 llm_call:
3727 model: gpt-4.1
3728 config:
3729 prompt: |
3730 Classify this email into exactly one category:
3731 {{ input.email_text }}
3732"#;
3733
3734 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3735 let sink = RecordingSink {
3736 events: Mutex::new(Vec::new()),
3737 };
3738 let options = YamlWorkflowRunOptions {
3739 telemetry: YamlWorkflowTelemetryConfig {
3740 nerdstats: false,
3741 ..YamlWorkflowTelemetryConfig::default()
3742 },
3743 ..YamlWorkflowRunOptions::default()
3744 };
3745
3746 run_workflow_yaml_with_custom_worker_and_events_and_options(
3747 &workflow,
3748 &json!({"email_text":"hello"}),
3749 &MockExecutor,
3750 None,
3751 Some(&sink),
3752 &options,
3753 )
3754 .await
3755 .expect("workflow should execute");
3756
3757 let events = sink
3758 .events
3759 .lock()
3760 .expect("recording sink lock should not be poisoned");
3761 let completed = events
3762 .iter()
3763 .find(|event| event.event_type == "workflow_completed")
3764 .expect("expected workflow_completed event");
3765 assert!(completed.metadata.is_none());
3766 }
3767
3768 struct StreamAwareMockExecutor;
3769
3770 #[async_trait]
3771 impl YamlWorkflowLlmExecutor for StreamAwareMockExecutor {
3772 async fn complete_structured(
3773 &self,
3774 request: YamlLlmExecutionRequest,
3775 _event_sink: Option<&dyn YamlWorkflowEventSink>,
3776 ) -> Result<YamlLlmExecutionResult, String> {
3777 Ok(YamlLlmExecutionResult {
3778 payload: json!({"state":"ok"}),
3779 usage: Some(YamlLlmTokenUsage {
3780 prompt_tokens: 20,
3781 completion_tokens: 10,
3782 total_tokens: 30,
3783 reasoning_tokens: None,
3784 }),
3785 ttft_ms: if request.stream { Some(12) } else { None },
3786 tool_calls: Vec::new(),
3787 })
3788 }
3789 }
3790
3791 #[tokio::test]
3792 async fn workflow_completed_event_includes_nerdstats_for_streaming_nodes() {
3793 let yaml = r#"
3794id: nerdstats-streaming
3795entry_node: classify
3796nodes:
3797 - id: classify
3798 node_type:
3799 llm_call:
3800 model: gpt-4.1
3801 stream: true
3802 config:
3803 prompt: |
3804 Return JSON only:
3805 {"state":"ok"}
3806"#;
3807
3808 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
3809 let sink = RecordingSink {
3810 events: Mutex::new(Vec::new()),
3811 };
3812
3813 run_workflow_yaml_with_custom_worker_and_events_and_options(
3814 &workflow,
3815 &json!({"email_text":"hello"}),
3816 &StreamAwareMockExecutor,
3817 None,
3818 Some(&sink),
3819 &YamlWorkflowRunOptions::default(),
3820 )
3821 .await
3822 .expect("workflow should execute");
3823
3824 let events = sink
3825 .events
3826 .lock()
3827 .expect("recording sink lock should not be poisoned");
3828 let completed = events
3829 .iter()
3830 .find(|event| event.event_type == "workflow_completed")
3831 .expect("expected workflow_completed event");
3832 let metadata = completed
3833 .metadata
3834 .as_ref()
3835 .expect("workflow_completed should include metadata by default");
3836 let nerdstats = metadata
3837 .get("nerdstats")
3838 .expect("nerdstats should be present by default");
3839
3840 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
3841 assert_eq!(nerdstats["total_tokens"], Value::Number(30u64.into()));
3842 assert_eq!(nerdstats["ttft_ms"], Value::Number(12u64.into()));
3843 assert!(nerdstats.get("step_timings").is_none());
3844 assert!(nerdstats.get("llm_node_metrics").is_none());
3845 assert_eq!(
3846 nerdstats["step_details"][0]["model_name"],
3847 Value::String("gpt-4.1".to_string())
3848 );
3849 assert_eq!(
3850 nerdstats["step_details"][0]["node_id"],
3851 Value::String("classify".to_string())
3852 );
3853 }
3854
3855 #[test]
3856 fn workflow_nerdstats_marks_stream_token_metrics_unavailable() {
3857 let output = YamlWorkflowRunOutput {
3858 workflow_id: "workflow".to_string(),
3859 entry_node: "start".to_string(),
3860 email_text: "hello".to_string(),
3861 trace: vec!["llm_node".to_string()],
3862 outputs: BTreeMap::new(),
3863 terminal_node: "llm_node".to_string(),
3864 terminal_output: None,
3865 step_timings: vec![YamlStepTiming {
3866 node_id: "llm_node".to_string(),
3867 node_kind: "llm_call".to_string(),
3868 model_name: Some("gpt-4.1".to_string()),
3869 elapsed_ms: 100,
3870 prompt_tokens: None,
3871 completion_tokens: None,
3872 total_tokens: None,
3873 reasoning_tokens: None,
3874 tokens_per_second: None,
3875 }],
3876 llm_node_metrics: BTreeMap::new(),
3877 llm_node_models: BTreeMap::new(),
3878 total_elapsed_ms: 100,
3879 ttft_ms: None,
3880 total_input_tokens: 0,
3881 total_output_tokens: 0,
3882 total_tokens: 0,
3883 total_reasoning_tokens: None,
3884 tokens_per_second: 0.0,
3885 trace_id: Some("trace-1".to_string()),
3886 metadata: None,
3887 };
3888
3889 let nerdstats = workflow_nerdstats(&output);
3890 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(false));
3891 assert_eq!(
3892 nerdstats["token_metrics_source"],
3893 Value::String("provider_stream_usage_unavailable".to_string())
3894 );
3895 assert_eq!(nerdstats["total_tokens"], Value::Null);
3896 assert_eq!(nerdstats["ttft_ms"], Value::Null);
3897 assert!(nerdstats.get("step_timings").is_none());
3898 assert!(nerdstats.get("llm_node_metrics").is_none());
3899 assert_eq!(
3900 nerdstats["step_details"][0]["node_id"],
3901 Value::String("llm_node".to_string())
3902 );
3903 assert_eq!(
3904 nerdstats["step_details"][0]["model_name"],
3905 Value::String("gpt-4.1".to_string())
3906 );
3907 }
3908
3909 #[test]
3910 fn workflow_nerdstats_includes_ttft_when_available() {
3911 let output = YamlWorkflowRunOutput {
3912 workflow_id: "workflow".to_string(),
3913 entry_node: "start".to_string(),
3914 email_text: "hello".to_string(),
3915 trace: vec!["llm_node".to_string()],
3916 outputs: BTreeMap::new(),
3917 terminal_node: "llm_node".to_string(),
3918 terminal_output: None,
3919 step_timings: vec![YamlStepTiming {
3920 node_id: "llm_node".to_string(),
3921 node_kind: "llm_call".to_string(),
3922 model_name: Some("gpt-4.1".to_string()),
3923 elapsed_ms: 100,
3924 prompt_tokens: Some(10),
3925 completion_tokens: Some(15),
3926 total_tokens: Some(25),
3927 reasoning_tokens: None,
3928 tokens_per_second: Some(150.0),
3929 }],
3930 llm_node_metrics: BTreeMap::new(),
3931 llm_node_models: BTreeMap::new(),
3932 total_elapsed_ms: 100,
3933 ttft_ms: Some(42),
3934 total_input_tokens: 10,
3935 total_output_tokens: 15,
3936 total_tokens: 25,
3937 total_reasoning_tokens: None,
3938 tokens_per_second: 150.0,
3939 trace_id: Some("trace-2".to_string()),
3940 metadata: None,
3941 };
3942
3943 let nerdstats = workflow_nerdstats(&output);
3944 assert_eq!(nerdstats["ttft_ms"], Value::Number(42u64.into()));
3945 assert!(nerdstats.get("step_timings").is_none());
3946 assert!(nerdstats.get("llm_node_metrics").is_none());
3947 assert_eq!(
3948 nerdstats["step_details"][0]["node_id"],
3949 Value::String("llm_node".to_string())
3950 );
3951 assert_eq!(
3952 nerdstats["step_details"][0]["model_name"],
3953 Value::String("gpt-4.1".to_string())
3954 );
3955 }
3956
3957 #[test]
3958 fn workflow_nerdstats_schema_contract_is_stable() {
3959 let output = YamlWorkflowRunOutput {
3960 workflow_id: "schema-workflow".to_string(),
3961 entry_node: "start".to_string(),
3962 email_text: "hello".to_string(),
3963 trace: vec!["classify".to_string(), "route".to_string()],
3964 outputs: BTreeMap::new(),
3965 terminal_node: "route".to_string(),
3966 terminal_output: None,
3967 step_timings: vec![
3968 YamlStepTiming {
3969 node_id: "classify".to_string(),
3970 node_kind: "llm_call".to_string(),
3971 model_name: Some("gpt-4.1".to_string()),
3972 elapsed_ms: 100,
3973 prompt_tokens: Some(11),
3974 completion_tokens: Some(22),
3975 total_tokens: Some(33),
3976 reasoning_tokens: Some(7),
3977 tokens_per_second: Some(220.0),
3978 },
3979 YamlStepTiming {
3980 node_id: "route".to_string(),
3981 node_kind: "switch".to_string(),
3982 model_name: None,
3983 elapsed_ms: 0,
3984 prompt_tokens: None,
3985 completion_tokens: None,
3986 total_tokens: None,
3987 reasoning_tokens: None,
3988 tokens_per_second: None,
3989 },
3990 ],
3991 llm_node_metrics: BTreeMap::new(),
3992 llm_node_models: BTreeMap::new(),
3993 total_elapsed_ms: 100,
3994 ttft_ms: Some(9),
3995 total_input_tokens: 11,
3996 total_output_tokens: 22,
3997 total_tokens: 33,
3998 total_reasoning_tokens: Some(7),
3999 tokens_per_second: 220.0,
4000 trace_id: Some("trace-schema".to_string()),
4001 metadata: None,
4002 };
4003
4004 let nerdstats = workflow_nerdstats(&output);
4005 let expected = json!({
4006 "workflow_id": "schema-workflow",
4007 "terminal_node": "route",
4008 "total_elapsed_ms": 100,
4009 "ttft_ms": 9,
4010 "step_details": [
4011 {
4012 "node_id": "classify",
4013 "node_kind": "llm_call",
4014 "model_name": "gpt-4.1",
4015 "elapsed_ms": 100,
4016 "prompt_tokens": 11,
4017 "completion_tokens": 22,
4018 "total_tokens": 33,
4019 "reasoning_tokens": 7,
4020 "tokens_per_second": 220.0
4021 },
4022 {
4023 "node_id": "route",
4024 "node_kind": "switch",
4025 "elapsed_ms": 0
4026 }
4027 ],
4028 "total_input_tokens": 11,
4029 "total_output_tokens": 22,
4030 "total_tokens": 33,
4031 "total_reasoning_tokens": 7,
4032 "tokens_per_second": 220.0,
4033 "trace_id": "trace-schema",
4034 "token_metrics_available": true,
4035 "token_metrics_source": "provider_usage",
4036 "llm_nodes_without_usage": []
4037 });
4038
4039 assert_eq!(nerdstats, expected);
4040 }
4041
4042 struct MessageHistoryExecutor;
4043
4044 #[async_trait]
4045 impl YamlWorkflowLlmExecutor for MessageHistoryExecutor {
4046 async fn complete_structured(
4047 &self,
4048 request: YamlLlmExecutionRequest,
4049 _event_sink: Option<&dyn YamlWorkflowEventSink>,
4050 ) -> Result<YamlLlmExecutionResult, String> {
4051 let messages = request
4052 .messages
4053 .ok_or_else(|| "expected messages in request".to_string())?;
4054 if messages.len() != 2 {
4055 return Err(format!("expected 2 messages, got {}", messages.len()));
4056 }
4057 Ok(YamlLlmExecutionResult {
4058 payload: json!({"category":"termination","reason":"history"}),
4059 usage: Some(YamlLlmTokenUsage {
4060 prompt_tokens: 7,
4061 completion_tokens: 3,
4062 total_tokens: 10,
4063 reasoning_tokens: None,
4064 }),
4065 ttft_ms: None,
4066 tool_calls: Vec::new(),
4067 })
4068 }
4069 }
4070
4071 #[tokio::test]
4072 async fn supports_messages_path_in_workflow_input() {
4073 let yaml = r#"
4074id: email-intake-classification
4075entry_node: classify_top_level
4076nodes:
4077 - id: classify_top_level
4078 node_type:
4079 llm_call:
4080 model: gpt-4.1
4081 messages_path: input.messages
4082 append_prompt_as_user: false
4083"#;
4084
4085 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4086 let input = json!({
4087 "email_text": "ignored",
4088 "messages": [
4089 {"role": "system", "content": "You are a classifier"},
4090 {"role": "user", "content": "Please classify this"}
4091 ]
4092 });
4093
4094 let output = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
4095 .await
4096 .expect("workflow should use chat history from input");
4097
4098 assert_eq!(output.terminal_node, "classify_top_level");
4099 assert_eq!(
4100 output.outputs["classify_top_level"]["output"]["reason"],
4101 Value::String("history".to_string())
4102 );
4103 }
4104
4105 #[tokio::test]
4106 async fn wrapper_entrypoints_produce_equivalent_outputs() {
4107 let yaml = r#"
4108id: wrapper-equivalence
4109entry_node: classify
4110nodes:
4111 - id: classify
4112 node_type:
4113 llm_call:
4114 model: gpt-4.1
4115 config:
4116 prompt: |
4117 Classify this email into exactly one category:
4118 {{ input.email_text }}
4119"#;
4120
4121 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4122 let input = json!({"email_text":"hello"});
4123
4124 let a = run_workflow_yaml(&workflow, &input, &MockExecutor)
4125 .await
4126 .expect("base entrypoint should execute");
4127 let b = run_workflow_yaml_with_custom_worker(&workflow, &input, &MockExecutor, None)
4128 .await
4129 .expect("custom worker wrapper should execute");
4130 let c = run_workflow_yaml_with_custom_worker_and_events_and_options(
4131 &workflow,
4132 &input,
4133 &MockExecutor,
4134 None,
4135 None,
4136 &YamlWorkflowRunOptions::default(),
4137 )
4138 .await
4139 .expect("events/options wrapper should execute");
4140
4141 assert_eq!(a.workflow_id, b.workflow_id);
4142 assert_eq!(a.workflow_id, c.workflow_id);
4143 assert_eq!(a.terminal_node, b.terminal_node);
4144 assert_eq!(a.terminal_node, c.terminal_node);
4145 assert_eq!(a.outputs, b.outputs);
4146 assert_eq!(a.outputs, c.outputs);
4147 assert_eq!(a.total_tokens, b.total_tokens);
4148 assert_eq!(a.total_tokens, c.total_tokens);
4149 }
4150
4151 #[tokio::test]
4152 async fn yaml_llm_tool_calling_captures_traces_and_supports_globals_reference() {
4153 let yaml = r#"
4154id: tool-calling-workflow
4155entry_node: generate_with_tool
4156nodes:
4157 - id: generate_with_tool
4158 node_type:
4159 llm_call:
4160 model: gpt-4.1
4161 tools_format: simplified
4162 max_tool_roundtrips: 1
4163 tool_calls_global_key: audit
4164 tools:
4165 - name: get_customer_context
4166 description: Fetch customer context
4167 input_schema:
4168 type: object
4169 properties:
4170 order_id: { type: string }
4171 required: [order_id]
4172 additionalProperties: false
4173 output_schema:
4174 type: object
4175 properties:
4176 customer_name: { type: string }
4177 required: [customer_name]
4178 additionalProperties: false
4179 config:
4180 output_schema:
4181 type: object
4182 properties:
4183 state: { type: string }
4184 required: [state]
4185 - id: personalize
4186 node_type:
4187 llm_call:
4188 model: gpt-4.1
4189 config:
4190 prompt: |
4191 Write an email greeting for {{ globals.audit.0.output.customer_name }}.
4192 output_schema:
4193 type: object
4194 properties:
4195 subject: { type: string }
4196 body: { type: string }
4197 required: [subject, body]
4198edges:
4199 - from: generate_with_tool
4200 to: personalize
4201"#;
4202
4203 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4204 let client = SimpleAgentsClientBuilder::new()
4205 .with_provider(Arc::new(ToolLoopProvider))
4206 .build()
4207 .expect("client should build");
4208 let worker = FixedToolWorker {
4209 payload: json!({"customer_name": "Ava"}),
4210 };
4211
4212 let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4213 &workflow,
4214 &json!({"email_text":"hello"}),
4215 &client,
4216 Some(&worker),
4217 None,
4218 &YamlWorkflowRunOptions::default(),
4219 )
4220 .await
4221 .expect("workflow should execute");
4222
4223 assert_eq!(output.trace, vec!["generate_with_tool", "personalize"]);
4224 assert_eq!(
4225 output.outputs["generate_with_tool"]["tool_calls"][0]["output"]["customer_name"],
4226 Value::String("Ava".to_string())
4227 );
4228 let body = output.outputs["personalize"]["output"]["body"]
4229 .as_str()
4230 .expect("body should be string");
4231 assert!(body.contains("Ava"));
4232 }
4233
4234 #[tokio::test]
4235 async fn workflow_with_client_preserves_reasoning_tokens_in_output_and_nerdstats() {
4236 let yaml = r#"
4237id: reasoning-usage-workflow
4238entry_node: classify
4239nodes:
4240 - id: classify
4241 node_type:
4242 llm_call:
4243 model: gpt-4.1
4244 config:
4245 prompt: |
4246 Return JSON only:
4247 {"state":"ok"}
4248 output_schema:
4249 type: object
4250 properties:
4251 state: { type: string }
4252 required: [state]
4253"#;
4254
4255 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4256 let client = SimpleAgentsClientBuilder::new()
4257 .with_provider(Arc::new(ReasoningUsageProvider))
4258 .build()
4259 .expect("client should build");
4260
4261 let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4262 &workflow,
4263 &json!({"email_text":"hello"}),
4264 &client,
4265 None,
4266 None,
4267 &YamlWorkflowRunOptions::default(),
4268 )
4269 .await
4270 .expect("workflow should execute");
4271
4272 assert_eq!(output.total_reasoning_tokens, Some(4));
4273 assert_eq!(output.step_timings.len(), 1);
4274 assert_eq!(output.step_timings[0].reasoning_tokens, Some(4));
4275
4276 let nerdstats = workflow_nerdstats(&output);
4277 assert_eq!(
4278 nerdstats["total_reasoning_tokens"],
4279 Value::Number(4u64.into())
4280 );
4281 assert_eq!(
4282 nerdstats["step_details"][0]["reasoning_tokens"],
4283 Value::Number(4u64.into())
4284 );
4285 }
4286
4287 #[tokio::test]
4288 async fn workflow_with_tools_accumulates_reasoning_tokens_across_roundtrips() {
4289 let yaml = r#"
4290id: tool-reasoning-workflow
4291entry_node: generate_with_tool
4292nodes:
4293 - id: generate_with_tool
4294 node_type:
4295 llm_call:
4296 model: gpt-4.1
4297 tools_format: simplified
4298 max_tool_roundtrips: 1
4299 tools:
4300 - name: get_customer_context
4301 input_schema:
4302 type: object
4303 properties:
4304 order_id: { type: string }
4305 required: [order_id]
4306 additionalProperties: false
4307 output_schema:
4308 type: object
4309 properties:
4310 customer_name: { type: string }
4311 required: [customer_name]
4312 additionalProperties: false
4313 config:
4314 output_schema:
4315 type: object
4316 properties:
4317 state: { type: string }
4318 required: [state]
4319"#;
4320
4321 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4322 let client = SimpleAgentsClientBuilder::new()
4323 .with_provider(Arc::new(ToolLoopReasoningProvider))
4324 .build()
4325 .expect("client should build");
4326 let worker = FixedToolWorker {
4327 payload: json!({"customer_name": "Ava"}),
4328 };
4329
4330 let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4331 &workflow,
4332 &json!({"email_text":"hello"}),
4333 &client,
4334 Some(&worker),
4335 None,
4336 &YamlWorkflowRunOptions::default(),
4337 )
4338 .await
4339 .expect("workflow should execute");
4340
4341 assert_eq!(output.total_reasoning_tokens, Some(5));
4342 assert_eq!(output.step_timings.len(), 1);
4343 assert_eq!(output.step_timings[0].reasoning_tokens, Some(5));
4344
4345 let nerdstats = workflow_nerdstats(&output);
4346 assert_eq!(
4347 nerdstats["total_reasoning_tokens"],
4348 Value::Number(5u64.into())
4349 );
4350 assert_eq!(
4351 nerdstats["step_details"][0]["reasoning_tokens"],
4352 Value::Number(5u64.into())
4353 );
4354 }
4355
4356 #[tokio::test]
4357 async fn yaml_llm_tool_output_schema_mismatch_hard_fails_node() {
4358 let yaml = r#"
4359id: tool-calling-schema-fail
4360entry_node: generate_with_tool
4361nodes:
4362 - id: generate_with_tool
4363 node_type:
4364 llm_call:
4365 model: gpt-4.1
4366 tools_format: simplified
4367 max_tool_roundtrips: 1
4368 tools:
4369 - name: get_customer_context
4370 input_schema:
4371 type: object
4372 properties:
4373 order_id: { type: string }
4374 required: [order_id]
4375 additionalProperties: false
4376 output_schema:
4377 type: object
4378 properties:
4379 customer_name: { type: string }
4380 required: [customer_name]
4381 additionalProperties: false
4382 config:
4383 output_schema:
4384 type: object
4385 properties:
4386 state: { type: string }
4387 required: [state]
4388"#;
4389
4390 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4391 let client = SimpleAgentsClientBuilder::new()
4392 .with_provider(Arc::new(ToolLoopProvider))
4393 .build()
4394 .expect("client should build");
4395 let worker = FixedToolWorker {
4396 payload: json!({"unexpected": "shape"}),
4397 };
4398
4399 let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4400 &workflow,
4401 &json!({"email_text":"hello"}),
4402 &client,
4403 Some(&worker),
4404 None,
4405 &YamlWorkflowRunOptions::default(),
4406 )
4407 .await
4408 .expect_err("workflow should hard-fail on schema mismatch");
4409
4410 match error {
4411 YamlWorkflowRunError::Llm { message, .. } => {
4412 assert!(message.contains("output failed schema validation"));
4413 }
4414 other => panic!("expected llm error, got {other:?}"),
4415 }
4416 }
4417
4418 #[tokio::test]
4419 async fn yaml_llm_unknown_tool_is_rejected_before_custom_worker_execution() {
4420 let yaml = r#"
4421id: unknown-tool-rejected
4422entry_node: generate_with_tool
4423nodes:
4424 - id: generate_with_tool
4425 node_type:
4426 llm_call:
4427 model: gpt-4.1
4428 tools_format: simplified
4429 max_tool_roundtrips: 1
4430 tools:
4431 - name: get_customer_context
4432 input_schema:
4433 type: object
4434 properties:
4435 order_id: { type: string }
4436 required: [order_id]
4437 config:
4438 output_schema:
4439 type: object
4440 properties:
4441 state: { type: string }
4442 required: [state]
4443"#;
4444
4445 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4446 let client = SimpleAgentsClientBuilder::new()
4447 .with_provider(Arc::new(UnknownToolProvider))
4448 .build()
4449 .expect("client should build");
4450 let worker = CountingToolWorker {
4451 execute_calls: AtomicUsize::new(0),
4452 };
4453
4454 let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4455 &workflow,
4456 &json!({"email_text":"hello"}),
4457 &client,
4458 Some(&worker),
4459 None,
4460 &YamlWorkflowRunOptions::default(),
4461 )
4462 .await
4463 .expect_err("workflow should reject unknown tool before executing worker");
4464
4465 match error {
4466 YamlWorkflowRunError::Llm { message, .. } => {
4467 assert!(message.contains("model requested unknown tool 'unknown_tool'"));
4468 }
4469 other => panic!("expected llm error, got {other:?}"),
4470 }
4471
4472 assert_eq!(worker.execute_calls.load(Ordering::SeqCst), 0);
4473 }
4474
4475 #[test]
4476 fn validates_tools_format_mismatch() {
4477 let yaml = r#"
4478id: mismatch
4479entry_node: generate
4480nodes:
4481 - id: generate
4482 node_type:
4483 llm_call:
4484 model: gpt-4.1
4485 tools_format: openai
4486 tools:
4487 - name: get_customer_context
4488 input_schema:
4489 type: object
4490 properties:
4491 order_id: { type: string }
4492 required: [order_id]
4493"#;
4494
4495 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4496 let diagnostics = verify_yaml_workflow(&workflow);
4497 assert!(diagnostics
4498 .iter()
4499 .any(|diagnostic| diagnostic.code == "invalid_tools_format"));
4500 }
4501
4502 #[tokio::test]
4503 async fn custom_worker_node_requires_executor() {
4504 let yaml = r#"
4505id: missing-custom-worker
4506entry_node: enrich
4507nodes:
4508 - id: enrich
4509 node_type:
4510 custom_worker:
4511 handler: GetEmployeeRecord
4512"#;
4513
4514 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4515 let client = SimpleAgentsClientBuilder::new()
4516 .with_provider(Arc::new(ToolLoopProvider))
4517 .build()
4518 .expect("client should build");
4519
4520 let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
4521 &workflow,
4522 &json!({"email_text": "hello"}),
4523 &client,
4524 None,
4525 None,
4526 &YamlWorkflowRunOptions::default(),
4527 )
4528 .await
4529 .expect_err("workflow should fail without custom worker executor");
4530
4531 match error {
4532 YamlWorkflowRunError::CustomWorker { node_id, message } => {
4533 assert_eq!(node_id, "enrich");
4534 assert!(message.contains("requires a configured custom worker executor"));
4535 }
4536 other => panic!("expected custom worker error, got {other:?}"),
4537 }
4538 }
4539
4540 #[tokio::test]
4541 async fn custom_worker_receives_trace_context_block() {
4542 let yaml = r#"
4543id: custom-worker-trace-context
4544entry_node: lookup
4545nodes:
4546 - id: lookup
4547 node_type:
4548 custom_worker:
4549 handler: GetRagData
4550 config:
4551 payload:
4552 topic: demo
4553"#;
4554
4555 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4556 let worker = CapturingWorker {
4557 context: Mutex::new(None),
4558 };
4559 let options = YamlWorkflowRunOptions {
4560 trace: YamlWorkflowTraceOptions {
4561 context: Some(YamlWorkflowTraceContextInput {
4562 trace_id: Some("trace-fixed-ctx".to_string()),
4563 traceparent: Some("00-trace-fixed-ctx-span-fixed-01".to_string()),
4564 ..YamlWorkflowTraceContextInput::default()
4565 }),
4566 tenant: YamlWorkflowTraceTenantContext {
4567 conversation_id: Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5".to_string()),
4568 request_id: Some("turn-7".to_string()),
4569 ..YamlWorkflowTraceTenantContext::default()
4570 },
4571 },
4572 ..YamlWorkflowRunOptions::default()
4573 };
4574
4575 run_workflow_yaml_with_custom_worker_and_events_and_options(
4576 &workflow,
4577 &json!({"email_text":"hello"}),
4578 &MockExecutor,
4579 Some(&worker),
4580 None,
4581 &options,
4582 )
4583 .await
4584 .expect("workflow should execute");
4585
4586 let captured_context = worker
4587 .context
4588 .lock()
4589 .expect("capturing worker lock should not be poisoned")
4590 .clone()
4591 .expect("custom worker should receive context");
4592
4593 assert_eq!(
4594 captured_context
4595 .get("trace")
4596 .and_then(|trace| trace.get("context"))
4597 .and_then(|context| context.get("trace_id"))
4598 .and_then(Value::as_str),
4599 Some("trace-fixed-ctx")
4600 );
4601 assert_eq!(
4602 captured_context
4603 .get("trace")
4604 .and_then(|trace| trace.get("context"))
4605 .and_then(|context| context.get("traceparent"))
4606 .and_then(Value::as_str),
4607 Some("00-trace-fixed-ctx-span-fixed-01")
4608 );
4609 assert_eq!(
4610 captured_context
4611 .get("trace")
4612 .and_then(|trace| trace.get("tenant"))
4613 .and_then(|tenant| tenant.get("conversation_id"))
4614 .and_then(Value::as_str),
4615 Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5")
4616 );
4617 }
4618
4619 #[tokio::test]
4620 async fn event_sink_cancellation_stops_workflow_before_llm_execution() {
4621 let yaml = r#"
4622id: cancellation-test
4623entry_node: classify
4624nodes:
4625 - id: classify
4626 node_type:
4627 llm_call:
4628 model: gpt-4.1
4629 config:
4630 prompt: |
4631 Classify this email into exactly one category:
4632 {{ input.email_text }}
4633"#;
4634
4635 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4636 let executor = CountingExecutor {
4637 call_count: AtomicUsize::new(0),
4638 };
4639 let sink = CancelAfterFirstEventSink {
4640 cancelled: AtomicBool::new(false),
4641 };
4642
4643 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
4644 &workflow,
4645 &json!({"email_text":"hello"}),
4646 &executor,
4647 None,
4648 Some(&sink),
4649 &YamlWorkflowRunOptions::default(),
4650 )
4651 .await
4652 .expect_err("workflow should stop when sink signals cancellation");
4653
4654 assert!(matches!(
4655 err,
4656 YamlWorkflowRunError::EventSinkCancelled { .. }
4657 ));
4658 assert_eq!(executor.call_count.load(Ordering::SeqCst), 0);
4659 }
4660
4661 #[tokio::test]
4662 async fn rejects_invalid_messages_path_shape() {
4663 let yaml = r#"
4664id: email-intake-classification
4665entry_node: classify_top_level
4666nodes:
4667 - id: classify_top_level
4668 node_type:
4669 llm_call:
4670 model: gpt-4.1
4671 messages_path: input.messages
4672"#;
4673
4674 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4675 let input = json!({
4676 "email_text": "ignored",
4677 "messages": "not-a-list"
4678 });
4679
4680 let err = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
4681 .await
4682 .expect_err("workflow should fail for invalid messages shape");
4683
4684 assert!(matches!(err, YamlWorkflowRunError::Llm { .. }));
4685 }
4686
4687 #[test]
4688 fn renders_yaml_workflow_to_mermaid_with_switch_labels() {
4689 let yaml = r#"
4690id: chat-workflow
4691entry_node: decide
4692nodes:
4693 - id: decide
4694 node_type:
4695 switch:
4696 branches:
4697 - condition: '$.input.mode == "draft"'
4698 target: draft
4699 default: ask
4700 - id: draft
4701 node_type:
4702 llm_call:
4703 model: gpt-4.1
4704 - id: ask
4705 node_type:
4706 llm_call:
4707 model: gpt-4.1
4708edges:
4709 - from: draft
4710 to: ask
4711"#;
4712
4713 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4714 let mermaid = yaml_workflow_to_mermaid(&workflow);
4715
4716 assert!(mermaid.contains("flowchart TD"));
4717 assert!(mermaid.contains("decide -- \"route1\" --> draft"));
4718 assert!(mermaid.contains("decide -- \"default\" --> ask"));
4719 assert!(mermaid.contains("draft --> ask"));
4720 }
4721
4722 #[test]
4723 fn renders_yaml_workflow_tools_as_colored_tool_nodes() {
4724 let yaml = r#"
4725id: tool-graph
4726entry_node: chat
4727nodes:
4728 - id: chat
4729 node_type:
4730 llm_call:
4731 model: gemini-3-flash
4732 tools_format: simplified
4733 tools:
4734 - name: run_workflow_graph
4735 input_schema:
4736 type: object
4737 properties:
4738 workflow_id: { type: string }
4739 required: [workflow_id]
4740edges: []
4741"#;
4742
4743 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4744 let mermaid = yaml_workflow_to_mermaid(&workflow);
4745
4746 assert!(mermaid.contains("chat__tool_0"));
4747 assert!(mermaid.contains("chat -.-> chat__tool_0"));
4748 assert!(mermaid.contains("classDef toolNode"));
4749 assert!(mermaid.contains("class chat__tool_0 toolNode;"));
4750 }
4751
4752 #[test]
4753 fn renders_yaml_workflow_file_to_mermaid_with_subgraph_cluster_when_present() {
4754 let base_dir = std::env::temp_dir().join(format!(
4755 "simple_agents_mermaid_subgraph_{}",
4756 std::time::SystemTime::now()
4757 .duration_since(std::time::UNIX_EPOCH)
4758 .expect("unix epoch")
4759 .as_nanos()
4760 ));
4761 fs::create_dir_all(&base_dir).expect("temp dir should be created");
4762
4763 let orchestrator_path = base_dir.join("email-chat-orchestrator-with-subgraph-tool.yaml");
4764 let subgraph_path = base_dir.join("hr-warning-email-subgraph.yaml");
4765
4766 let orchestrator_yaml = r#"
4767id: email-chat-orchestrator-with-subgraph-tool
4768entry_node: respond_casual
4769nodes:
4770 - id: respond_casual
4771 node_type:
4772 llm_call:
4773 model: gemini-3-flash
4774 tools_format: simplified
4775 tools:
4776 - name: run_workflow_graph
4777 input_schema:
4778 type: object
4779 properties:
4780 workflow_id: { type: string }
4781 required: [workflow_id]
4782 config:
4783 prompt: |
4784 Call with:
4785 {
4786 "workflow_id": "hr-warning-email-subgraph"
4787 }
4788edges: []
4789"#;
4790
4791 let subgraph_yaml = r#"
4792id: hr-warning-email-subgraph
4793entry_node: draft_hr_warning_email
4794nodes:
4795 - id: draft_hr_warning_email
4796 node_type:
4797 llm_call:
4798 model: gemini-3-flash
4799edges: []
4800"#;
4801
4802 fs::write(&orchestrator_path, orchestrator_yaml).expect("orchestrator yaml written");
4803 fs::write(&subgraph_path, subgraph_yaml).expect("subgraph yaml written");
4804
4805 let mermaid =
4806 yaml_workflow_file_to_mermaid(&orchestrator_path).expect("mermaid should render");
4807
4808 assert!(mermaid.contains("Main: email-chat-orchestrator-with-subgraph-tool"));
4809 assert!(mermaid.contains("Subgraph: hr-warning-email-subgraph"));
4810 assert!(mermaid.contains("calls hr-warning-email-subgraph"));
4811 assert!(mermaid.contains("subgraph_1__draft_hr_warning_email"));
4812
4813 fs::remove_dir_all(base_dir).expect("temp dir removed");
4814 }
4815
4816 #[tokio::test]
4817 async fn rejects_non_yaml_workflow_file_extension() {
4818 let file_path = std::env::temp_dir().join(format!(
4819 "simple_agents_workflow_{}.txt",
4820 std::time::SystemTime::now()
4821 .duration_since(std::time::UNIX_EPOCH)
4822 .expect("unix epoch")
4823 .as_nanos()
4824 ));
4825 fs::write(&file_path, "not yaml").expect("temporary file should be created");
4826
4827 let result =
4828 run_workflow_yaml_file(&file_path, &json!({"email_text": "hello"}), &MockExecutor)
4829 .await;
4830
4831 match result {
4832 Err(YamlWorkflowRunError::FileRejected { path, reason }) => {
4833 assert!(path.ends_with(".txt"));
4834 assert!(reason.contains(".yaml or .yml"));
4835 }
4836 other => panic!("expected file rejection error, got {other:?}"),
4837 }
4838
4839 let _ = fs::remove_file(&file_path);
4840 }
4841
4842 #[test]
4843 fn converts_yaml_workflow_to_ir_definition() {
4844 let yaml = r#"
4845id: chat-workflow
4846entry_node: classify
4847nodes:
4848 - id: classify
4849 node_type:
4850 llm_call:
4851 model: gpt-4.1
4852 config:
4853 prompt: |
4854 classify
4855 - id: route
4856 node_type:
4857 switch:
4858 branches:
4859 - condition: '$.nodes.classify.output.kind == "x"'
4860 target: done
4861 default: done
4862 - id: done
4863 node_type:
4864 custom_worker:
4865 handler: GetRagData
4866 config:
4867 payload:
4868 topic: test
4869edges:
4870 - from: classify
4871 to: route
4872"#;
4873
4874 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4875 let ir = yaml_workflow_to_ir(&workflow).expect("yaml should convert to ir");
4876
4877 assert_eq!(ir.name, "chat-workflow");
4878 assert!(ir.nodes.iter().any(|n| n.id == "__yaml_start"));
4879 assert!(ir.nodes.iter().any(|n| n.id == "classify"));
4880 assert!(ir.nodes.iter().any(|n| n.id == "route"));
4881 assert!(ir.nodes.iter().any(|n| n.id == "done"));
4882 }
4883
4884 #[test]
4885 fn supports_yaml_to_ir_when_messages_path_is_used() {
4886 let yaml = r#"
4887id: chat-workflow
4888entry_node: classify
4889nodes:
4890 - id: classify
4891 node_type:
4892 llm_call:
4893 model: gpt-4.1
4894 messages_path: input.messages
4895"#;
4896
4897 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4898 let ir =
4899 yaml_workflow_to_ir(&workflow).expect("messages_path should convert to tool-based IR");
4900 assert!(ir.nodes.iter().any(|node| matches!(
4901 node.kind,
4902 crate::ir::NodeKind::Tool { ref tool, .. } if tool == "__yaml_llm_call"
4903 )));
4904 }
4905
4906 #[test]
4907 fn yaml_to_ir_preserves_llm_sampling_controls() {
4908 let yaml = r#"
4909id: llm-controls
4910entry_node: classify
4911nodes:
4912 - id: classify
4913 node_type:
4914 llm_call:
4915 model: gpt-4.1
4916 max_tokens: 120
4917 temperature: 0.3
4918 top_p: 0.9
4919 config:
4920 prompt: "classify"
4921"#;
4922
4923 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4924 let ir = yaml_workflow_to_ir(&workflow).expect("yaml should convert to ir");
4925 let llm_tool_input = ir
4926 .nodes
4927 .iter()
4928 .find_map(|node| {
4929 if let crate::ir::NodeKind::Tool { tool, input, .. } = &node.kind {
4930 if tool == "__yaml_llm_call" {
4931 return Some(input);
4932 }
4933 }
4934 None
4935 })
4936 .expect("expected __yaml_llm_call tool node");
4937
4938 assert_eq!(
4939 llm_tool_input.get("max_tokens").and_then(Value::as_u64),
4940 Some(120)
4941 );
4942 let temperature = llm_tool_input
4943 .get("temperature")
4944 .and_then(Value::as_f64)
4945 .expect("temperature should be preserved");
4946 assert!((temperature - 0.3).abs() < 1e-6);
4947 let top_p = llm_tool_input
4948 .get("top_p")
4949 .and_then(Value::as_f64)
4950 .expect("top_p should be preserved");
4951 assert!((top_p - 0.9).abs() < 1e-6);
4952 }
4953
4954 #[tokio::test]
4955 async fn ir_custom_worker_path_preserves_handler_file() {
4956 let yaml = r#"
4957id: worker-handler-file
4958entry_node: worker
4959nodes:
4960 - id: worker
4961 node_type:
4962 custom_worker:
4963 handler: GetRagData
4964 handler_file: handlers.py
4965 config:
4966 payload:
4967 topic: clarification
4968"#;
4969
4970 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
4971 let worker = HandlerFileCapturingWorker {
4972 handler_file: Mutex::new(None),
4973 };
4974
4975 run_workflow_yaml_with_custom_worker_and_events_and_options(
4976 &workflow,
4977 &json!({"email_text":"hello"}),
4978 &MockExecutor,
4979 Some(&worker),
4980 None,
4981 &YamlWorkflowRunOptions::default(),
4982 )
4983 .await
4984 .expect("workflow should execute");
4985
4986 let captured = worker
4987 .handler_file
4988 .lock()
4989 .expect("handler-file lock should not be poisoned")
4990 .clone();
4991 assert_eq!(captured.as_deref(), Some("handlers.py"));
4992 }
4993
4994 #[tokio::test]
4995 async fn workflow_output_contains_trace_id_in_both_locations() {
4996 let yaml = r#"
4997id: trace-test
4998entry_node: classify
4999nodes:
5000 - id: classify
5001 node_type:
5002 llm_call:
5003 model: gpt-4.1
5004 config:
5005 prompt: |
5006 Classify this email into exactly one category:
5007 {{ input.email_text }}
5008"#;
5009
5010 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5011 let output = run_workflow_yaml(&workflow, &json!({"email_text":"hello"}), &MockExecutor)
5012 .await
5013 .expect("workflow should execute");
5014
5015 let trace_id = output
5016 .trace_id
5017 .as_deref()
5018 .expect("trace_id should be present");
5019 assert!(!trace_id.is_empty());
5020 assert_eq!(
5021 output.metadata.as_ref().and_then(|value| {
5022 value
5023 .get("telemetry")
5024 .and_then(|telemetry| telemetry.get("trace_id"))
5025 .and_then(Value::as_str)
5026 }),
5027 Some(trace_id)
5028 );
5029 }
5030
5031 #[tokio::test]
5032 async fn workflow_run_options_sample_rate_zero_marks_trace_unsampled() {
5033 let yaml = r#"
5034id: sample-rate-zero
5035entry_node: classify
5036nodes:
5037 - id: classify
5038 node_type:
5039 llm_call:
5040 model: gpt-4.1
5041 config:
5042 prompt: |
5043 Classify this email into exactly one category:
5044 {{ input.email_text }}
5045"#;
5046
5047 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5048 let options = YamlWorkflowRunOptions {
5049 telemetry: YamlWorkflowTelemetryConfig {
5050 sample_rate: 0.0,
5051 ..YamlWorkflowTelemetryConfig::default()
5052 },
5053 trace: YamlWorkflowTraceOptions {
5054 context: Some(YamlWorkflowTraceContextInput {
5055 trace_id: Some("trace-sample-zero".to_string()),
5056 ..YamlWorkflowTraceContextInput::default()
5057 }),
5058 tenant: YamlWorkflowTraceTenantContext::default(),
5059 },
5060 model: None,
5061 };
5062
5063 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5064 &workflow,
5065 &json!({"email_text":"hello"}),
5066 &MockExecutor,
5067 None,
5068 None,
5069 &options,
5070 )
5071 .await
5072 .expect("workflow should execute");
5073
5074 assert_eq!(output.trace_id.as_deref(), Some("trace-sample-zero"));
5075 assert_eq!(
5076 output
5077 .metadata
5078 .as_ref()
5079 .and_then(|value| value.get("telemetry"))
5080 .and_then(|telemetry| telemetry.get("sampled"))
5081 .and_then(Value::as_bool),
5082 Some(false)
5083 );
5084 }
5085
5086 #[test]
5087 fn trace_id_from_traceparent_parses_w3c_header() {
5088 let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
5089 assert_eq!(
5090 trace_id_from_traceparent(traceparent).as_deref(),
5091 Some("4bf92f3577b34da6a3ce929d0e0e4736")
5092 );
5093 }
5094
5095 #[test]
5096 fn resolve_telemetry_context_marks_traceparent_source() {
5097 let mut options = YamlWorkflowRunOptions::default();
5098 options.trace.context = Some(YamlWorkflowTraceContextInput {
5099 traceparent: Some(
5100 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
5101 ),
5102 ..YamlWorkflowTraceContextInput::default()
5103 });
5104
5105 let context = resolve_telemetry_context(&options, None);
5106
5107 assert_eq!(context.trace_id_source, TraceIdSource::Traceparent);
5108 assert_eq!(
5109 context.trace_id.as_deref(),
5110 Some("4bf92f3577b34da6a3ce929d0e0e4736")
5111 );
5112 }
5113
5114 #[tokio::test]
5115 async fn workflow_run_options_derives_trace_id_from_traceparent_when_trace_id_missing() {
5116 let yaml = r#"
5117id: traceparent-derived
5118entry_node: classify
5119nodes:
5120 - id: classify
5121 node_type:
5122 llm_call:
5123 model: gpt-4.1
5124 config:
5125 prompt: |
5126 Classify this email into exactly one category:
5127 {{ input.email_text }}
5128"#;
5129
5130 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5131 let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
5132 let options = YamlWorkflowRunOptions {
5133 telemetry: YamlWorkflowTelemetryConfig {
5134 sample_rate: 0.0,
5135 ..YamlWorkflowTelemetryConfig::default()
5136 },
5137 trace: YamlWorkflowTraceOptions {
5138 context: Some(YamlWorkflowTraceContextInput {
5139 traceparent: Some(traceparent.to_string()),
5140 ..YamlWorkflowTraceContextInput::default()
5141 }),
5142 tenant: YamlWorkflowTraceTenantContext::default(),
5143 },
5144 model: None,
5145 };
5146
5147 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5148 &workflow,
5149 &json!({"email_text":"hello"}),
5150 &MockExecutor,
5151 None,
5152 None,
5153 &options,
5154 )
5155 .await
5156 .expect("workflow should execute");
5157
5158 assert_eq!(
5159 output.trace_id.as_deref(),
5160 Some("4bf92f3577b34da6a3ce929d0e0e4736")
5161 );
5162 assert_eq!(
5163 output
5164 .metadata
5165 .as_ref()
5166 .and_then(|value| value.get("telemetry"))
5167 .and_then(|telemetry| telemetry.get("sampled"))
5168 .and_then(Value::as_bool),
5169 Some(false)
5170 );
5171 }
5172
5173 #[tokio::test]
5174 async fn workflow_run_options_sample_rate_one_marks_trace_sampled() {
5175 let yaml = r#"
5176id: sample-rate-one
5177entry_node: classify
5178nodes:
5179 - id: classify
5180 node_type:
5181 llm_call:
5182 model: gpt-4.1
5183 config:
5184 prompt: |
5185 Classify this email into exactly one category:
5186 {{ input.email_text }}
5187"#;
5188
5189 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5190 let options = YamlWorkflowRunOptions {
5191 telemetry: YamlWorkflowTelemetryConfig {
5192 sample_rate: 1.0,
5193 ..YamlWorkflowTelemetryConfig::default()
5194 },
5195 trace: YamlWorkflowTraceOptions {
5196 context: Some(YamlWorkflowTraceContextInput {
5197 trace_id: Some("trace-sample-one".to_string()),
5198 ..YamlWorkflowTraceContextInput::default()
5199 }),
5200 tenant: YamlWorkflowTraceTenantContext::default(),
5201 },
5202 model: None,
5203 };
5204
5205 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5206 &workflow,
5207 &json!({"email_text":"hello"}),
5208 &MockExecutor,
5209 None,
5210 None,
5211 &options,
5212 )
5213 .await
5214 .expect("workflow should execute");
5215
5216 assert_eq!(output.trace_id.as_deref(), Some("trace-sample-one"));
5217 assert_eq!(
5218 output
5219 .metadata
5220 .as_ref()
5221 .and_then(|value| value.get("telemetry"))
5222 .and_then(|telemetry| telemetry.get("sampled"))
5223 .and_then(Value::as_bool),
5224 Some(true)
5225 );
5226 }
5227
5228 #[tokio::test]
5229 async fn workflow_run_options_sampling_is_deterministic_for_fixed_trace_id() {
5230 let yaml = r#"
5231id: sample-rate-deterministic
5232entry_node: classify
5233nodes:
5234 - id: classify
5235 node_type:
5236 llm_call:
5237 model: gpt-4.1
5238 config:
5239 prompt: |
5240 Classify this email into exactly one category:
5241 {{ input.email_text }}
5242"#;
5243
5244 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5245 let options = YamlWorkflowRunOptions {
5246 telemetry: YamlWorkflowTelemetryConfig {
5247 sample_rate: 0.5,
5248 ..YamlWorkflowTelemetryConfig::default()
5249 },
5250 trace: YamlWorkflowTraceOptions {
5251 context: Some(YamlWorkflowTraceContextInput {
5252 trace_id: Some("trace-sample-deterministic".to_string()),
5253 ..YamlWorkflowTraceContextInput::default()
5254 }),
5255 tenant: YamlWorkflowTraceTenantContext::default(),
5256 },
5257 model: None,
5258 };
5259
5260 let mut sampled_values = Vec::new();
5261 for _ in 0..3 {
5262 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5263 &workflow,
5264 &json!({"email_text":"hello"}),
5265 &MockExecutor,
5266 None,
5267 None,
5268 &options,
5269 )
5270 .await
5271 .expect("workflow should execute");
5272
5273 let sampled = output
5274 .metadata
5275 .as_ref()
5276 .and_then(|value| value.get("telemetry"))
5277 .and_then(|telemetry| telemetry.get("sampled"))
5278 .and_then(Value::as_bool)
5279 .expect("sampled flag should be present");
5280 sampled_values.push(sampled);
5281 }
5282
5283 assert!(sampled_values
5284 .iter()
5285 .all(|value| *value == sampled_values[0]));
5286 }
5287
5288 #[tokio::test]
5289 async fn workflow_run_options_reject_invalid_sample_rate() {
5290 let yaml = r#"
5291id: sample-rate-invalid
5292entry_node: classify
5293nodes:
5294 - id: classify
5295 node_type:
5296 llm_call:
5297 model: gpt-4.1
5298 config:
5299 prompt: |
5300 Classify this email into exactly one category:
5301 {{ input.email_text }}
5302"#;
5303
5304 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5305 let options = YamlWorkflowRunOptions {
5306 telemetry: YamlWorkflowTelemetryConfig {
5307 sample_rate: 1.1,
5308 ..YamlWorkflowTelemetryConfig::default()
5309 },
5310 ..YamlWorkflowRunOptions::default()
5311 };
5312
5313 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
5314 &workflow,
5315 &json!({"email_text":"hello"}),
5316 &MockExecutor,
5317 None,
5318 None,
5319 &options,
5320 )
5321 .await
5322 .expect_err("invalid sample_rate should fail");
5323
5324 match err {
5325 YamlWorkflowRunError::InvalidInput { message } => {
5326 assert!(message.contains("telemetry.sample_rate"));
5327 }
5328 _ => panic!("expected invalid input error for sample_rate"),
5329 }
5330 }
5331
5332 #[tokio::test]
5333 async fn workflow_run_options_reject_nan_sample_rate() {
5334 let yaml = r#"
5335id: sample-rate-invalid
5336entry_node: classify
5337nodes:
5338 - id: classify
5339 node_type:
5340 llm_call:
5341 model: gpt-4.1
5342 config:
5343 prompt: |
5344 Classify this email into exactly one category:
5345 {{ input.email_text }}
5346"#;
5347
5348 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5349 let options = YamlWorkflowRunOptions {
5350 telemetry: YamlWorkflowTelemetryConfig {
5351 sample_rate: f32::NAN,
5352 ..YamlWorkflowTelemetryConfig::default()
5353 },
5354 ..YamlWorkflowRunOptions::default()
5355 };
5356
5357 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
5358 &workflow,
5359 &json!({"email_text":"hello"}),
5360 &MockExecutor,
5361 None,
5362 None,
5363 &options,
5364 )
5365 .await
5366 .expect_err("nan sample_rate should fail");
5367
5368 assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
5369 }
5370
5371 #[test]
5372 fn subworkflow_options_inherit_parent_telemetry_configuration() {
5373 let mut parent_options = YamlWorkflowRunOptions::default();
5374 parent_options.telemetry.sample_rate = 0.0;
5375 parent_options.telemetry.payload_mode = YamlWorkflowPayloadMode::RedactedPayload;
5376 parent_options.telemetry.tool_trace_mode = YamlToolTraceMode::Redacted;
5377 parent_options.trace.tenant.conversation_id = Some("conv-123".to_string());
5378
5379 let parent_context = TraceContext {
5380 trace_id: Some("trace-parent".to_string()),
5381 span_id: Some("span-parent".to_string()),
5382 parent_span_id: None,
5383 traceparent: Some("00-trace-parent-span-parent-01".to_string()),
5384 tracestate: None,
5385 baggage: BTreeMap::new(),
5386 };
5387
5388 let subworkflow_options =
5389 build_subworkflow_options(&parent_options, Some(&parent_context), None);
5390
5391 assert_eq!(subworkflow_options.telemetry.sample_rate, 0.0);
5392 assert_eq!(
5393 subworkflow_options.telemetry.payload_mode,
5394 YamlWorkflowPayloadMode::RedactedPayload
5395 );
5396 assert_eq!(
5397 subworkflow_options.telemetry.tool_trace_mode,
5398 YamlToolTraceMode::Redacted
5399 );
5400 assert_eq!(
5401 subworkflow_options.trace.tenant.conversation_id.as_deref(),
5402 Some("conv-123")
5403 );
5404 assert_eq!(
5405 subworkflow_options
5406 .trace
5407 .context
5408 .as_ref()
5409 .and_then(|ctx| ctx.trace_id.as_deref()),
5410 Some("trace-parent")
5411 );
5412 }
5413
5414 #[test]
5415 fn trace_tenant_attributes_include_langfuse_aliases() {
5416 let tenant = YamlWorkflowTraceTenantContext {
5417 workspace_id: Some("ws-1".to_string()),
5418 user_id: Some("user-1".to_string()),
5419 conversation_id: Some("conv-1".to_string()),
5420 request_id: Some("req-1".to_string()),
5421 run_id: Some("run-1".to_string()),
5422 };
5423 let mut span = CapturingSpan::default();
5424 apply_trace_tenant_attributes_from_tenant(&mut span, &tenant);
5425
5426 assert_eq!(
5427 span.attributes
5428 .get("tenant.workspace_id")
5429 .map(String::as_str),
5430 Some("ws-1")
5431 );
5432 assert_eq!(
5433 span.attributes.get("tenant.user_id").map(String::as_str),
5434 Some("user-1")
5435 );
5436 assert_eq!(
5437 span.attributes
5438 .get("tenant.conversation_id")
5439 .map(String::as_str),
5440 Some("conv-1")
5441 );
5442 assert_eq!(
5443 span.attributes.get("langfuse.user.id").map(String::as_str),
5444 Some("user-1")
5445 );
5446 assert_eq!(
5447 span.attributes
5448 .get("langfuse.session.id")
5449 .map(String::as_str),
5450 Some("conv-1")
5451 );
5452 }
5453
5454 #[test]
5455 fn langfuse_nerdstats_attributes_are_written_when_enabled() {
5456 let output = YamlWorkflowRunOutput {
5457 workflow_id: "wf-1".to_string(),
5458 entry_node: "start".to_string(),
5459 email_text: "email".to_string(),
5460 trace: vec!["node-1".to_string()],
5461 outputs: BTreeMap::new(),
5462 terminal_node: "node-1".to_string(),
5463 terminal_output: None,
5464 step_timings: vec![YamlStepTiming {
5465 node_id: "node-1".to_string(),
5466 node_kind: "llm_call".to_string(),
5467 model_name: Some("gpt-4.1-mini".to_string()),
5468 elapsed_ms: 42,
5469 prompt_tokens: Some(4),
5470 completion_tokens: Some(6),
5471 total_tokens: Some(10),
5472 reasoning_tokens: Some(2),
5473 tokens_per_second: Some(14.0),
5474 }],
5475 llm_node_metrics: BTreeMap::new(),
5476 llm_node_models: BTreeMap::new(),
5477 total_elapsed_ms: 42,
5478 ttft_ms: Some(7),
5479 total_input_tokens: 4,
5480 total_output_tokens: 6,
5481 total_tokens: 10,
5482 total_reasoning_tokens: Some(2),
5483 tokens_per_second: 14.0,
5484 trace_id: Some("trace-1".to_string()),
5485 metadata: None,
5486 };
5487
5488 let mut span = CapturingSpan::default();
5489 apply_langfuse_nerdstats_attributes(&mut span, &output, true);
5490
5491 assert_eq!(
5492 span.attributes
5493 .get("langfuse.trace.metadata.nerdstats.workflow_id")
5494 .map(String::as_str),
5495 Some("wf-1")
5496 );
5497 assert_eq!(
5498 span.attributes
5499 .get("langfuse.trace.metadata.nerdstats.total_tokens")
5500 .map(String::as_str),
5501 Some("10")
5502 );
5503 assert_eq!(
5504 span.attributes
5505 .get("langfuse.trace.metadata.nerdstats.token_metrics_available")
5506 .map(String::as_str),
5507 Some("true")
5508 );
5509 assert!(span
5510 .attributes
5511 .contains_key("langfuse.trace.metadata.nerdstats"));
5512 }
5513
5514 #[test]
5515 fn langfuse_trace_input_output_and_usage_are_written() {
5516 let output = YamlWorkflowRunOutput {
5517 workflow_id: "wf-1".to_string(),
5518 entry_node: "start".to_string(),
5519 email_text: "email".to_string(),
5520 trace: vec!["node-1".to_string()],
5521 outputs: BTreeMap::new(),
5522 terminal_node: "node-1".to_string(),
5523 terminal_output: Some(json!({"final":"ok"})),
5524 step_timings: Vec::new(),
5525 llm_node_metrics: BTreeMap::new(),
5526 llm_node_models: BTreeMap::new(),
5527 total_elapsed_ms: 10,
5528 ttft_ms: None,
5529 total_input_tokens: 11,
5530 total_output_tokens: 22,
5531 total_tokens: 33,
5532 total_reasoning_tokens: Some(4),
5533 tokens_per_second: 1.5,
5534 trace_id: Some("trace-1".to_string()),
5535 metadata: None,
5536 };
5537
5538 let mut span = CapturingSpan::default();
5539 let input = json!({"email_text":"hello"});
5540 apply_langfuse_trace_input_output_attributes(
5541 &mut span,
5542 &input,
5543 &output,
5544 YamlWorkflowPayloadMode::FullPayload,
5545 );
5546
5547 assert_eq!(
5548 span.attributes
5549 .get("langfuse.trace.input")
5550 .map(String::as_str),
5551 Some("{\"email_text\":\"hello\"}")
5552 );
5553 assert_eq!(
5554 span.attributes
5555 .get("langfuse.trace.output")
5556 .map(String::as_str),
5557 Some("{\"final\":\"ok\"}")
5558 );
5559 assert!(span
5560 .attributes
5561 .contains_key("langfuse.trace.metadata.usage_details"));
5562 }
5563
5564 #[test]
5565 fn langfuse_observation_usage_attributes_are_written() {
5566 let usage = YamlLlmTokenUsage {
5567 prompt_tokens: 7,
5568 completion_tokens: 9,
5569 total_tokens: 16,
5570 reasoning_tokens: Some(3),
5571 };
5572 let mut span = CapturingSpan::default();
5573 apply_langfuse_observation_usage_attributes(&mut span, &usage);
5574
5575 assert_eq!(
5576 span.attributes
5577 .get("gen_ai.usage.input_tokens")
5578 .map(String::as_str),
5579 Some("7")
5580 );
5581 assert_eq!(
5582 span.attributes
5583 .get("gen_ai.usage.output_tokens")
5584 .map(String::as_str),
5585 Some("9")
5586 );
5587 assert_eq!(
5588 span.attributes
5589 .get("gen_ai.usage.total_tokens")
5590 .map(String::as_str),
5591 Some("16")
5592 );
5593 assert_eq!(
5594 span.attributes
5595 .get("gen_ai.usage.reasoning_tokens")
5596 .map(String::as_str),
5597 Some("3")
5598 );
5599 assert!(span
5600 .attributes
5601 .contains_key("langfuse.observation.usage_details"));
5602 }
5603
5604 #[tokio::test]
5605 async fn workflow_run_options_use_explicit_trace_id_and_payload_mode() {
5606 let yaml = r#"
5607id: trace-options-test
5608entry_node: classify
5609nodes:
5610 - id: classify
5611 node_type:
5612 llm_call:
5613 model: gpt-4.1
5614 config:
5615 prompt: |
5616 Classify this email into exactly one category:
5617 {{ input.email_text }}
5618"#;
5619
5620 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5621 let options = YamlWorkflowRunOptions {
5622 telemetry: YamlWorkflowTelemetryConfig {
5623 payload_mode: YamlWorkflowPayloadMode::RedactedPayload,
5624 ..YamlWorkflowTelemetryConfig::default()
5625 },
5626 trace: YamlWorkflowTraceOptions {
5627 context: Some(YamlWorkflowTraceContextInput {
5628 trace_id: Some("trace-fixed-123".to_string()),
5629 traceparent: Some("00-trace-fixed-123-span-1-01".to_string()),
5630 ..YamlWorkflowTraceContextInput::default()
5631 }),
5632 tenant: YamlWorkflowTraceTenantContext {
5633 conversation_id: Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42".to_string()),
5634 ..YamlWorkflowTraceTenantContext::default()
5635 },
5636 },
5637 model: None,
5638 };
5639
5640 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5641 &workflow,
5642 &json!({"email_text":"hello"}),
5643 &MockExecutor,
5644 None,
5645 None,
5646 &options,
5647 )
5648 .await
5649 .expect("workflow should execute");
5650
5651 assert_eq!(output.trace_id.as_deref(), Some("trace-fixed-123"));
5652 assert_eq!(
5653 output
5654 .metadata
5655 .as_ref()
5656 .and_then(|value| value.get("telemetry"))
5657 .and_then(|telemetry| telemetry.get("payload_mode"))
5658 .and_then(Value::as_str),
5659 Some("redacted_payload")
5660 );
5661 assert_eq!(
5662 output
5663 .metadata
5664 .as_ref()
5665 .and_then(|value| value.get("trace"))
5666 .and_then(|trace| trace.get("tenant"))
5667 .and_then(|tenant| tenant.get("conversation_id"))
5668 .and_then(Value::as_str),
5669 Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42")
5670 );
5671 }
5672
5673 #[test]
5674 fn streamed_payload_parser_extracts_last_json_object() {
5675 let raw = r#"{"state":"missing_scenario","reason":"ok"}
5676
5677extra reasoning text
5678
5679{"state":"ready","reason":"final"}"#;
5680
5681 let resolved = parse_streamed_structured_payload(raw, false)
5682 .expect("parser should extract final JSON object");
5683 assert_eq!(resolved.payload["state"], "ready");
5684 assert!(resolved.heal_confidence.is_none());
5685 }
5686
5687 #[test]
5688 fn streamed_payload_parser_handles_unbalanced_reasoning_before_json() {
5689 let raw = "reasoning text with unmatched { braces and thoughts\n{\"state\":\"ready\",\"reason\":\"final\"}";
5690
5691 let resolved = parse_streamed_structured_payload(raw, false)
5692 .expect("parser should recover final structured JSON object");
5693 assert_eq!(resolved.payload["state"], "ready");
5694 }
5695
5696 #[test]
5697 fn streamed_payload_parser_handles_markdown_with_heal() {
5698 let raw = r#"Some preface
5699```json
5700{
5701 "state": "missing_scenario",
5702 "reason": "Need more details"
5703}
5704```
5705Some trailing explanation"#;
5706
5707 let resolved = parse_streamed_structured_payload(raw, true)
5708 .expect("heal path should parse JSON block");
5709 assert_eq!(resolved.payload["state"], "missing_scenario");
5710 assert!(resolved.heal_confidence.is_some());
5711 }
5712
5713 #[test]
5714 fn streamed_payload_parser_errors_when_no_json_candidate_exists() {
5715 let raw = "No JSON in this streamed output";
5716 let error = parse_streamed_structured_payload(raw, false)
5717 .expect_err("strict stream parse should fail without JSON candidate");
5718 assert!(error.contains("no JSON object candidate found"));
5719 }
5720
5721 #[test]
5722 fn include_raw_stream_debug_events_defaults_to_false() {
5723 let _guard = stream_debug_env_lock()
5724 .lock()
5725 .expect("stream debug env lock should not be poisoned");
5726 std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
5727 assert!(!include_raw_stream_debug_events());
5728 }
5729
5730 #[test]
5731 fn include_raw_stream_debug_events_accepts_truthy_values() {
5732 let _guard = stream_debug_env_lock()
5733 .lock()
5734 .expect("stream debug env lock should not be poisoned");
5735 std::env::set_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW", "true");
5736 assert!(include_raw_stream_debug_events());
5737 std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
5738 }
5739
5740 #[test]
5741 fn structured_json_delta_filter_strips_reasoning_prefix_and_suffix() {
5742 let mut filter = StructuredJsonDeltaFilter::default();
5743 let chunks = vec![
5744 "I will think first... ",
5745 "{\"state\":\"missing_scenario\",",
5746 "\"reason\":\"Need more details\"}",
5747 " additional commentary",
5748 ];
5749
5750 let filtered = chunks
5751 .into_iter()
5752 .filter_map(|chunk| filter.split(chunk).0)
5753 .collect::<String>();
5754
5755 assert_eq!(
5756 filtered,
5757 "{\"state\":\"missing_scenario\",\"reason\":\"Need more details\"}"
5758 );
5759 }
5760
5761 #[test]
5762 fn structured_json_delta_filter_handles_braces_inside_strings() {
5763 let mut filter = StructuredJsonDeltaFilter::default();
5764 let chunks = vec![
5765 "preface ",
5766 "{\"reason\":\"brace } in text\",\"state\":\"ok\"}",
5767 " trailing",
5768 ];
5769
5770 let filtered = chunks
5771 .into_iter()
5772 .filter_map(|chunk| filter.split(chunk).0)
5773 .collect::<String>();
5774
5775 assert_eq!(
5776 filtered,
5777 "{\"reason\":\"brace } in text\",\"state\":\"ok\"}"
5778 );
5779 }
5780
5781 #[test]
5782 fn render_json_object_as_text_converts_top_level_fields() {
5783 let rendered =
5784 render_json_object_as_text(r#"{"question":"q","confidence":0.8,"nested":{"a":1}}"#);
5785 let lines: std::collections::HashSet<&str> = rendered.lines().collect();
5786
5787 assert_eq!(lines.len(), 3);
5788 assert!(lines.contains("question: q"));
5789 assert!(lines.contains("confidence: 0.8"));
5790 assert!(lines.contains("nested: {\"a\":1}"));
5791 }
5792
5793 #[test]
5794 fn stream_json_as_text_formatter_emits_once_when_complete() {
5795 let mut formatter = StreamJsonAsTextFormatter::default();
5796 formatter.push("{\"question\":\"hello\"}");
5797
5798 let first = formatter.emit_if_ready(true);
5799 let second = formatter.emit_if_ready(true);
5800
5801 assert_eq!(first, Some("question: hello".to_string()));
5802 assert_eq!(second, None);
5803 }
5804
5805 #[test]
5806 fn rewrite_yaml_condition_preserves_output_prefix_in_field_names() {
5807 let expr = "$.nodes.classify.output.output_total == 1";
5808 let rewritten = rewrite_yaml_condition_to_ir(expr).expect("condition should rewrite");
5809 assert_eq!(rewritten, "$.node_outputs.classify.output_total == 1");
5810 }
5811
5812 #[test]
5813 fn rewrite_yaml_condition_does_not_mutate_paths_in_string_literals() {
5814 let expr = "$.nodes.classify.output.state == \"$.nodes.keep.output\"";
5815 let rewritten = rewrite_yaml_condition_to_ir(expr).expect("condition should rewrite");
5816 assert_eq!(
5817 rewritten,
5818 "$.node_outputs.classify.state == \"$.nodes.keep.output\""
5819 );
5820 }
5821
5822 #[test]
5823 fn rewrite_yaml_condition_rejects_unterminated_quotes() {
5824 let err = rewrite_yaml_condition_to_ir("$.nodes.classify.output.state == \"broken")
5825 .expect_err("unterminated quote should fail");
5826 assert!(err.contains("unterminated quoted string"));
5827 }
5828
5829 #[test]
5830 fn yaml_llm_call_rejects_unknown_provider_field() {
5831 let yaml = r#"
5832id: wf
5833entry_node: classify
5834nodes:
5835 - id: classify
5836 node_type:
5837 llm_call:
5838 provider: openai
5839 model: gpt-4.1-mini
5840 config:
5841 prompt: hi
5842"#;
5843
5844 let err = serde_yaml::from_str::<YamlWorkflow>(yaml)
5845 .expect_err("unknown llm_call fields should fail parsing");
5846 let message = err.to_string();
5847 assert!(message.contains("provider"));
5848 assert!(message.contains("unknown field"));
5849 }
5850
5851 #[test]
5852 fn yaml_custom_worker_rejects_unknown_language_field() {
5853 let yaml = r#"
5854id: wf
5855entry_node: worker
5856nodes:
5857 - id: worker
5858 node_type:
5859 custom_worker:
5860 language: python
5861 handler: get_rag_data
5862"#;
5863
5864 let err = serde_yaml::from_str::<YamlWorkflow>(yaml)
5865 .expect_err("unknown custom_worker fields should fail parsing");
5866 let message = err.to_string();
5867 assert!(message.contains("language"));
5868 assert!(message.contains("unknown field"));
5869 }
5870
5871 #[tokio::test]
5872 async fn custom_worker_handler_file_requires_executor() {
5873 let yaml = r#"
5874id: wf
5875entry_node: lookup
5876nodes:
5877 - id: lookup
5878 node_type:
5879 custom_worker:
5880 handler: get_rag_data
5881 handler_file: handlers.py
5882"#;
5883
5884 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5885 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
5886 &workflow,
5887 &json!({"email_text": "hello"}),
5888 &MockExecutor,
5889 None,
5890 None,
5891 &YamlWorkflowRunOptions::default(),
5892 )
5893 .await
5894 .expect_err("handler_file without executor should fail");
5895
5896 let rendered = err.to_string();
5897 assert!(rendered.contains("handler_file"));
5898 assert!(rendered.contains("no custom worker executor configured"));
5899 }
5900
5901 #[tokio::test]
5902 async fn validates_workflow_input_before_ir_runtime_path() {
5903 let yaml = r#"
5904id: chat-workflow
5905entry_node: classify
5906nodes:
5907 - id: classify
5908 node_type:
5909 llm_call:
5910 model: gpt-4.1
5911 config:
5912 prompt: |
5913 classify
5914"#;
5915
5916 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5917 let err = run_workflow_yaml(&workflow, &json!("not-an-object"), &MockExecutor)
5918 .await
5919 .expect_err("non-object input should fail before execution");
5920
5921 assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
5922 }
5923
5924 #[test]
5925 fn interpolate_template_supports_dollar_prefixed_paths() {
5926 let context = json!({
5927 "input": {
5928 "email_text": "hello"
5929 }
5930 });
5931
5932 let rendered = interpolate_template("value={{ $.input.email_text }}", &context);
5933 assert_eq!(rendered, "value=hello");
5934 }
5935
5936 #[test]
5937 fn to_typed_output_maps_node_kinds_from_workflow_definition() {
5938 let workflow_yaml = r#"
5939id: typed-output-demo
5940entry_node: classify
5941nodes:
5942 - id: classify
5943 node_type:
5944 llm_call:
5945 model: gpt-4.1
5946 - id: route
5947 node_type:
5948 switch:
5949 branches:
5950 - condition: '$.nodes.classify.output.state == "ready"'
5951 target: worker
5952 default: worker
5953 - id: worker
5954 node_type:
5955 custom_worker:
5956 handler: do_work
5957edges:
5958 - from: classify
5959 to: route
5960 - from: route
5961 to: worker
5962"#;
5963 let workflow: YamlWorkflow =
5964 serde_yaml::from_str(workflow_yaml).expect("workflow should parse");
5965 let output = YamlWorkflowRunOutput {
5966 workflow_id: "typed-output-demo".to_string(),
5967 entry_node: "classify".to_string(),
5968 email_text: String::new(),
5969 trace: vec![
5970 "classify".to_string(),
5971 "route".to_string(),
5972 "worker".to_string(),
5973 ],
5974 outputs: BTreeMap::from([
5975 ("classify".to_string(), json!({"state": "ready"})),
5976 ("route".to_string(), json!("worker")),
5977 ("worker".to_string(), json!({"ok": true})),
5978 ]),
5979 terminal_node: "worker".to_string(),
5980 terminal_output: Some(json!({"ok": true})),
5981 step_timings: Vec::new(),
5982 llm_node_metrics: BTreeMap::new(),
5983 llm_node_models: BTreeMap::new(),
5984 total_elapsed_ms: 0,
5985 ttft_ms: None,
5986 total_input_tokens: 0,
5987 total_output_tokens: 0,
5988 total_tokens: 0,
5989 total_reasoning_tokens: None,
5990 tokens_per_second: 0.0,
5991 trace_id: None,
5992 metadata: None,
5993 };
5994
5995 let typed = output.to_typed_output(&workflow);
5996 assert_eq!(typed.node_outputs.len(), 3);
5997 assert_eq!(
5998 typed
5999 .node_outputs
6000 .iter()
6001 .find(|record| record.node_id == "classify")
6002 .map(|record| record.node_kind),
6003 Some(YamlWorkflowNodeKind::LlmCall)
6004 );
6005 assert_eq!(
6006 typed
6007 .node_outputs
6008 .iter()
6009 .find(|record| record.node_id == "route")
6010 .map(|record| record.node_kind),
6011 Some(YamlWorkflowNodeKind::Switch)
6012 );
6013 assert_eq!(
6014 typed
6015 .node_outputs
6016 .iter()
6017 .find(|record| record.node_id == "worker")
6018 .map(|record| record.node_kind),
6019 Some(YamlWorkflowNodeKind::CustomWorker)
6020 );
6021 assert_eq!(
6022 typed
6023 .terminal_output
6024 .as_ref()
6025 .map(|record| record.node_kind),
6026 Some(YamlWorkflowNodeKind::CustomWorker)
6027 );
6028 }
6029
6030 #[test]
6031 fn to_typed_output_marks_unknown_node_ids() {
6032 let workflow_yaml = r#"
6033id: typed-output-unknown
6034entry_node: start
6035nodes:
6036 - id: start
6037 node_type:
6038 llm_call:
6039 model: gpt-4.1
6040"#;
6041 let workflow: YamlWorkflow =
6042 serde_yaml::from_str(workflow_yaml).expect("workflow should parse");
6043 let output = YamlWorkflowRunOutput {
6044 workflow_id: "typed-output-unknown".to_string(),
6045 entry_node: "start".to_string(),
6046 email_text: String::new(),
6047 trace: vec!["start".to_string(), "not-in-graph".to_string()],
6048 outputs: BTreeMap::from([("not-in-graph".to_string(), json!({"v": 1}))]),
6049 terminal_node: "not-in-graph".to_string(),
6050 terminal_output: Some(json!({"v": 1})),
6051 step_timings: Vec::new(),
6052 llm_node_metrics: BTreeMap::new(),
6053 llm_node_models: BTreeMap::new(),
6054 total_elapsed_ms: 0,
6055 ttft_ms: None,
6056 total_input_tokens: 0,
6057 total_output_tokens: 0,
6058 total_tokens: 0,
6059 total_reasoning_tokens: None,
6060 tokens_per_second: 0.0,
6061 trace_id: None,
6062 metadata: None,
6063 };
6064
6065 let typed = output.to_typed_output(&workflow);
6066 assert_eq!(typed.node_outputs.len(), 1);
6067 assert_eq!(
6068 typed.node_outputs[0].node_kind,
6069 YamlWorkflowNodeKind::Unknown
6070 );
6071 assert_eq!(
6072 typed
6073 .terminal_output
6074 .as_ref()
6075 .map(|record| record.node_kind),
6076 Some(YamlWorkflowNodeKind::Unknown)
6077 );
6078 }
6079
6080 #[test]
6081 fn to_typed_event_maps_known_event_type() {
6082 let event = YamlWorkflowEvent {
6083 event_type: "node_completed".to_string(),
6084 node_id: Some("classify".to_string()),
6085 step_id: Some("classify".to_string()),
6086 node_kind: Some("llm_call".to_string()),
6087 streamable: Some(false),
6088 message: Some("done".to_string()),
6089 delta: None,
6090 token_kind: None,
6091 is_terminal_node_token: Some(true),
6092 elapsed_ms: Some(11),
6093 metadata: Some(json!({"k": "v"})),
6094 };
6095
6096 let typed = event.to_typed_event();
6097 assert_eq!(typed.event_type, YamlWorkflowEventType::NodeCompleted);
6098 assert_eq!(typed.raw_event_type, "node_completed");
6099 assert_eq!(typed.node_id.as_deref(), Some("classify"));
6100 assert_eq!(
6101 typed.metadata.as_ref().and_then(|value| value.get("k")),
6102 Some(&json!("v"))
6103 );
6104 }
6105
6106 #[test]
6107 fn to_typed_event_marks_unknown_event_type() {
6108 let event = YamlWorkflowEvent {
6109 event_type: "custom_event_not_recognized".to_string(),
6110 node_id: None,
6111 step_id: None,
6112 node_kind: None,
6113 streamable: None,
6114 message: None,
6115 delta: None,
6116 token_kind: None,
6117 is_terminal_node_token: None,
6118 elapsed_ms: None,
6119 metadata: None,
6120 };
6121
6122 let typed = event.to_typed_event();
6123 assert_eq!(typed.event_type, YamlWorkflowEventType::Unknown);
6124 assert_eq!(typed.raw_event_type, "custom_event_not_recognized");
6125 }
6126}