1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::Path;
3use std::time::Instant;
4
5use async_trait::async_trait;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use simple_agent_type::message::{Message, Role};
10use simple_agent_type::request::CompletionRequest;
11use simple_agent_type::response::FinishReason;
12use simple_agent_type::tool::{
13 ToolCall, ToolChoice, ToolChoiceFunction, ToolChoiceMode, ToolChoiceTool, ToolDefinition,
14 ToolFunction, ToolType,
15};
16use simple_agents_core::{
17 CompletionMode, CompletionOptions, CompletionOutcome, SimpleAgentsClient,
18};
19use simple_agents_healing::JsonishParser;
20use thiserror::Error;
21
22use crate::ir::{Node, NodeKind, RouterRoute, WorkflowDefinition, WORKFLOW_IR_V0};
23use crate::observability::tracing::{
24 flush_workflow_tracer, workflow_tracer, SpanKind, TraceContext, WorkflowSpan,
25};
26use crate::runtime::{
27 LlmExecutionError, LlmExecutionInput, LlmExecutionOutput, LlmExecutor, ToolExecutionError,
28 ToolExecutionInput, ToolExecutor, WorkflowRuntime, WorkflowRuntimeError,
29 WorkflowRuntimeOptions,
30};
31use crate::validation::validate_and_normalize;
32use crate::visualize::workflow_to_mermaid;
33
34const YAML_START_NODE_ID: &str = "__yaml_start";
35const YAML_LLM_TOOL_ID: &str = "__yaml_llm_call";
36
37static TRACE_ID_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
38
39#[derive(Debug, Clone, PartialEq, Serialize)]
40pub struct YamlStepTiming {
41 pub node_id: String,
42 pub node_kind: String,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 pub model_name: Option<String>,
45 pub elapsed_ms: u128,
46 #[serde(skip_serializing_if = "Option::is_none")]
47 pub prompt_tokens: Option<u32>,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub completion_tokens: Option<u32>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub total_tokens: Option<u32>,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub reasoning_tokens: Option<u32>,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 pub tokens_per_second: Option<f64>,
56}
57
58#[derive(Debug, Clone, PartialEq, Serialize)]
59pub struct YamlLlmNodeMetrics {
60 pub elapsed_ms: u128,
61 pub prompt_tokens: u32,
62 pub completion_tokens: u32,
63 pub total_tokens: u32,
64 #[serde(skip_serializing_if = "Option::is_none")]
65 pub reasoning_tokens: Option<u32>,
66 pub tokens_per_second: f64,
67}
68
69#[derive(Debug, Clone, PartialEq, Serialize)]
70pub struct YamlWorkflowRunOutput {
71 pub workflow_id: String,
72 pub entry_node: String,
73 pub email_text: String,
74 pub trace: Vec<String>,
75 pub outputs: BTreeMap<String, Value>,
76 pub terminal_node: String,
77 pub terminal_output: Option<Value>,
78 pub step_timings: Vec<YamlStepTiming>,
79 pub llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics>,
80 pub llm_node_models: BTreeMap<String, String>,
81 pub total_elapsed_ms: u128,
82 #[serde(skip_serializing_if = "Option::is_none")]
83 pub ttft_ms: Option<u128>,
84 pub total_input_tokens: u64,
85 pub total_output_tokens: u64,
86 pub total_tokens: u64,
87 #[serde(skip_serializing_if = "Option::is_none")]
88 pub total_reasoning_tokens: Option<u64>,
89 pub tokens_per_second: f64,
90 #[serde(skip_serializing_if = "Option::is_none")]
91 pub trace_id: Option<String>,
92 #[serde(skip_serializing_if = "Option::is_none")]
93 pub metadata: Option<Value>,
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
97#[serde(rename_all = "snake_case")]
98pub enum YamlWorkflowPayloadMode {
99 #[default]
100 FullPayload,
101 RedactedPayload,
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
105#[serde(rename_all = "snake_case")]
106pub enum YamlToolTraceMode {
107 #[default]
108 Full,
109 Redacted,
110 Off,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
114pub struct YamlWorkflowTraceContextInput {
115 #[serde(default)]
116 pub trace_id: Option<String>,
117 #[serde(default)]
118 pub span_id: Option<String>,
119 #[serde(default)]
120 pub parent_span_id: Option<String>,
121 #[serde(default)]
122 pub traceparent: Option<String>,
123 #[serde(default)]
124 pub tracestate: Option<String>,
125 #[serde(default)]
126 pub baggage: BTreeMap<String, String>,
127}
128
129#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
130pub struct YamlWorkflowTraceTenantContext {
131 #[serde(default)]
132 pub workspace_id: Option<String>,
133 #[serde(default)]
134 pub user_id: Option<String>,
135 #[serde(default)]
136 pub conversation_id: Option<String>,
137 #[serde(default)]
138 pub request_id: Option<String>,
139 #[serde(default)]
140 pub run_id: Option<String>,
141}
142
143#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
144pub struct YamlWorkflowTelemetryConfig {
145 #[serde(default = "default_true")]
146 pub enabled: bool,
147 #[serde(default = "default_true")]
148 pub nerdstats: bool,
149 #[serde(default = "default_sample_rate")]
150 pub sample_rate: f32,
151 #[serde(default)]
152 pub payload_mode: YamlWorkflowPayloadMode,
153 #[serde(default = "default_retention_days")]
154 pub retention_days: u32,
155 #[serde(default = "default_true")]
156 pub multi_tenant: bool,
157 #[serde(default)]
158 pub tool_trace_mode: YamlToolTraceMode,
159}
160
161impl Default for YamlWorkflowTelemetryConfig {
162 fn default() -> Self {
163 Self {
164 enabled: true,
165 nerdstats: true,
166 sample_rate: 1.0,
167 payload_mode: YamlWorkflowPayloadMode::FullPayload,
168 retention_days: 30,
169 multi_tenant: true,
170 tool_trace_mode: YamlToolTraceMode::Full,
171 }
172 }
173}
174
175#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
176pub struct YamlWorkflowTraceOptions {
177 #[serde(default)]
178 pub context: Option<YamlWorkflowTraceContextInput>,
179 #[serde(default)]
180 pub tenant: YamlWorkflowTraceTenantContext,
181}
182
183#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
184pub struct YamlWorkflowRunOptions {
185 #[serde(default)]
186 pub telemetry: YamlWorkflowTelemetryConfig,
187 #[serde(default)]
188 pub trace: YamlWorkflowTraceOptions,
189 #[serde(default)]
190 pub model: Option<String>,
191}
192
193#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
194pub struct YamlLlmTokenUsage {
195 pub prompt_tokens: u32,
196 pub completion_tokens: u32,
197 pub total_tokens: u32,
198 pub reasoning_tokens: Option<u32>,
199}
200
201#[derive(Debug, Clone, PartialEq, Serialize)]
202pub struct YamlLlmExecutionResult {
203 pub payload: Value,
204 pub usage: Option<YamlLlmTokenUsage>,
205 pub ttft_ms: Option<u128>,
206 pub tool_calls: Vec<YamlToolCallTrace>,
207}
208
209#[derive(Debug, Clone, PartialEq, Serialize)]
210pub struct YamlToolCallTrace {
211 pub id: String,
212 pub name: String,
213 pub arguments: Value,
214 pub output: Option<Value>,
215 pub status: String,
216 pub elapsed_ms: u128,
217 pub error: Option<String>,
218}
219
220#[derive(Debug, Clone, Default)]
221struct YamlTokenTotals {
222 input_tokens: u64,
223 output_tokens: u64,
224 total_tokens: u64,
225 reasoning_tokens: Option<u64>,
226}
227
228impl YamlTokenTotals {
229 fn add_usage(&mut self, usage: &YamlLlmTokenUsage) {
230 self.input_tokens += u64::from(usage.prompt_tokens);
231 self.output_tokens += u64::from(usage.completion_tokens);
232 self.total_tokens += u64::from(usage.total_tokens);
233
234 if let Some(reasoning_tokens) = usage.reasoning_tokens {
235 let next = self.reasoning_tokens.unwrap_or(0) + u64::from(reasoning_tokens);
236 self.reasoning_tokens = Some(next);
237 }
238 }
239
240 fn tokens_per_second(&self, elapsed_ms: u128) -> f64 {
241 if elapsed_ms == 0 {
242 return 0.0;
243 }
244 round_two_decimals((self.output_tokens as f64) * 1000.0 / (elapsed_ms as f64))
245 }
246}
247
248fn round_two_decimals(value: f64) -> f64 {
249 (value * 100.0).round() / 100.0
250}
251
252fn completion_tokens_per_second(completion_tokens: u32, elapsed_ms: u128) -> f64 {
253 if elapsed_ms == 0 {
254 return 0.0;
255 }
256 round_two_decimals((completion_tokens as f64) * 1000.0 / (elapsed_ms as f64))
257}
258
259fn resolve_requested_model(run_model_override: Option<&str>, node_model: &str) -> String {
260 run_model_override
261 .and_then(|model| {
262 let trimmed = model.trim();
263 if trimmed.is_empty() {
264 None
265 } else {
266 Some(trimmed.to_string())
267 }
268 })
269 .unwrap_or_else(|| node_model.to_string())
270}
271
272fn default_true() -> bool {
273 true
274}
275
276fn default_sample_rate() -> f32 {
277 1.0
278}
279
280fn default_retention_days() -> u32 {
281 30
282}
283
284fn validate_sample_rate(sample_rate: f32) -> Result<(), YamlWorkflowRunError> {
285 if sample_rate.is_finite() && (0.0..=1.0).contains(&sample_rate) {
286 return Ok(());
287 }
288
289 Err(YamlWorkflowRunError::InvalidInput {
290 message: format!(
291 "telemetry.sample_rate must be between 0.0 and 1.0 inclusive; received {sample_rate}"
292 ),
293 })
294}
295
296fn should_sample_trace(trace_id: &str, sample_rate: f32) -> bool {
297 if sample_rate >= 1.0 {
298 return true;
299 }
300 if sample_rate <= 0.0 {
301 return false;
302 }
303
304 let mut hash: u64 = 0xcbf29ce484222325;
305 for byte in trace_id.as_bytes() {
306 hash ^= u64::from(*byte);
307 hash = hash.wrapping_mul(0x100000001b3);
308 }
309
310 let ratio = (hash as f64) / (u64::MAX as f64);
311 ratio < (sample_rate as f64)
312}
313
314fn trace_id_from_traceparent(traceparent: &str) -> Option<String> {
315 let mut parts = traceparent.split('-');
316 let version = parts.next()?;
317 let trace_id = parts.next()?;
318 let _span_id = parts.next()?;
319 let _flags = parts.next()?;
320 if parts.next().is_some() {
321 return None;
322 }
323
324 if version.len() != 2 || trace_id.len() != 32 {
325 return None;
326 }
327
328 if !version.chars().all(|ch| ch.is_ascii_hexdigit()) {
329 return None;
330 }
331 if !trace_id.chars().all(|ch| ch.is_ascii_hexdigit()) {
332 return None;
333 }
334 if trace_id.chars().all(|ch| ch == '0') {
335 return None;
336 }
337
338 Some(trace_id.to_ascii_lowercase())
339}
340
341#[derive(Debug, Clone, Copy, PartialEq, Eq)]
342enum TraceIdSource {
343 Disabled,
344 ExplicitTraceId,
345 ParentTraceId,
346 Traceparent,
347 ParentTraceparent,
348 Generated,
349}
350
351#[derive(Debug, Clone)]
352struct ResolvedTelemetryContext {
353 trace_id: Option<String>,
354 sampled: bool,
355 trace_id_source: TraceIdSource,
356}
357
358fn resolve_run_trace_id_with_source(
359 options: &YamlWorkflowRunOptions,
360 parent_trace_context: Option<&TraceContext>,
361) -> (Option<String>, TraceIdSource) {
362 if !options.telemetry.enabled {
363 return (None, TraceIdSource::Disabled);
364 }
365
366 if let Some(trace_id) = options
367 .trace
368 .context
369 .as_ref()
370 .and_then(|context| context.trace_id.clone())
371 {
372 return (Some(trace_id), TraceIdSource::ExplicitTraceId);
373 }
374
375 if let Some(trace_id) = parent_trace_context.and_then(|context| context.trace_id.clone()) {
376 return (Some(trace_id), TraceIdSource::ParentTraceId);
377 }
378
379 if let Some(trace_id) = options
380 .trace
381 .context
382 .as_ref()
383 .and_then(|context| context.traceparent.as_deref())
384 .and_then(trace_id_from_traceparent)
385 {
386 return (Some(trace_id), TraceIdSource::Traceparent);
387 }
388
389 if let Some(trace_id) = parent_trace_context
390 .and_then(|context| context.traceparent.as_deref())
391 .and_then(trace_id_from_traceparent)
392 {
393 return (Some(trace_id), TraceIdSource::ParentTraceparent);
394 }
395
396 (Some(generate_trace_id()), TraceIdSource::Generated)
397}
398
399fn resolve_telemetry_context(
400 options: &YamlWorkflowRunOptions,
401 parent_trace_context: Option<&TraceContext>,
402) -> ResolvedTelemetryContext {
403 let (trace_id, trace_id_source) =
404 resolve_run_trace_id_with_source(options, parent_trace_context);
405 let sampled = trace_id
406 .as_deref()
407 .map(|value| should_sample_trace(value, options.telemetry.sample_rate))
408 .unwrap_or(false);
409
410 ResolvedTelemetryContext {
411 trace_id,
412 sampled,
413 trace_id_source,
414 }
415}
416
417fn generate_trace_id() -> String {
418 use std::time::{SystemTime, UNIX_EPOCH};
419
420 let now_nanos = SystemTime::now()
421 .duration_since(UNIX_EPOCH)
422 .map(|duration| duration.as_nanos())
423 .unwrap_or(0);
424 let sequence = u128::from(TRACE_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed));
425 format!("{:032x}", now_nanos ^ sequence)
426}
427
428fn workflow_metadata_with_trace(
429 options: &YamlWorkflowRunOptions,
430 trace_id: &str,
431 sampled: bool,
432 trace_id_source: TraceIdSource,
433) -> Value {
434 json!({
435 "telemetry": {
436 "trace_id": trace_id,
437 "trace_id_source": match trace_id_source {
438 TraceIdSource::Disabled => "disabled",
439 TraceIdSource::ExplicitTraceId => "explicit_trace_id",
440 TraceIdSource::ParentTraceId => "parent_trace_id",
441 TraceIdSource::Traceparent => "traceparent",
442 TraceIdSource::ParentTraceparent => "parent_traceparent",
443 TraceIdSource::Generated => "generated",
444 },
445 "enabled": options.telemetry.enabled,
446 "sampled": sampled,
447 "nerdstats": options.telemetry.nerdstats,
448 "sample_rate": options.telemetry.sample_rate,
449 "payload_mode": match options.telemetry.payload_mode {
450 YamlWorkflowPayloadMode::FullPayload => "full_payload",
451 YamlWorkflowPayloadMode::RedactedPayload => "redacted_payload",
452 },
453 "retention_days": options.telemetry.retention_days,
454 "multi_tenant": options.telemetry.multi_tenant,
455 "tool_trace_mode": match options.telemetry.tool_trace_mode {
456 YamlToolTraceMode::Full => "full",
457 YamlToolTraceMode::Redacted => "redacted",
458 YamlToolTraceMode::Off => "off",
459 },
460 },
461 "trace": {
462 "tenant": {
463 "workspace_id": options.trace.tenant.workspace_id,
464 "user_id": options.trace.tenant.user_id,
465 "conversation_id": options.trace.tenant.conversation_id,
466 "request_id": options.trace.tenant.request_id,
467 "run_id": options.trace.tenant.run_id,
468 }
469 },
470 })
471}
472
473fn apply_trace_identity_attributes(span: &mut dyn WorkflowSpan, trace_id: Option<&str>) {
474 if let Some(value) = trace_id {
475 span.set_attribute("trace_id", value);
476 }
477}
478
479fn apply_trace_tenant_attributes_from_tenant(
480 span: &mut dyn WorkflowSpan,
481 tenant: &YamlWorkflowTraceTenantContext,
482) {
483 if let Some(workspace_id) = tenant.workspace_id.as_deref() {
484 span.set_attribute("tenant.workspace_id", workspace_id);
485 }
486 if let Some(user_id) = tenant.user_id.as_deref() {
487 span.set_attribute("tenant.user_id", user_id);
488 span.set_attribute("user.id", user_id);
489 span.set_attribute("langfuse.user.id", user_id);
490 }
491 if let Some(conversation_id) = tenant.conversation_id.as_deref() {
492 span.set_attribute("tenant.conversation_id", conversation_id);
493 span.set_attribute("session.id", conversation_id);
494 span.set_attribute("langfuse.session.id", conversation_id);
495 }
496 if let Some(request_id) = tenant.request_id.as_deref() {
497 span.set_attribute("tenant.request_id", request_id);
498 }
499 if let Some(run_id) = tenant.run_id.as_deref() {
500 span.set_attribute("tenant.run_id", run_id);
501 }
502}
503
504fn apply_trace_tenant_attributes(span: &mut dyn WorkflowSpan, options: &YamlWorkflowRunOptions) {
505 apply_trace_tenant_attributes_from_tenant(span, &options.trace.tenant);
506}
507
508fn workflow_nerdstats(output: &YamlWorkflowRunOutput) -> Value {
509 let llm_nodes_without_usage: Vec<String> = output
510 .step_timings
511 .iter()
512 .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
513 .map(|step| step.node_id.clone())
514 .collect();
515 let token_metrics_available = llm_nodes_without_usage.is_empty();
516 let token_metrics_source = if token_metrics_available {
517 "provider_usage"
518 } else {
519 "provider_stream_usage_unavailable"
520 };
521 let total_input_tokens = if token_metrics_available {
522 json!(output.total_input_tokens)
523 } else {
524 Value::Null
525 };
526 let total_output_tokens = if token_metrics_available {
527 json!(output.total_output_tokens)
528 } else {
529 Value::Null
530 };
531 let total_tokens = if token_metrics_available {
532 json!(output.total_tokens)
533 } else {
534 Value::Null
535 };
536 let total_reasoning_tokens = if token_metrics_available {
537 json!(output.total_reasoning_tokens)
538 } else {
539 Value::Null
540 };
541 let tokens_per_second = if token_metrics_available {
542 json!(output.tokens_per_second)
543 } else {
544 Value::Null
545 };
546
547 json!({
548 "workflow_id": output.workflow_id,
549 "terminal_node": output.terminal_node,
550 "total_elapsed_ms": output.total_elapsed_ms,
551 "ttft_ms": output.ttft_ms,
552 "step_details": output.step_timings,
553 "total_input_tokens": total_input_tokens,
554 "total_output_tokens": total_output_tokens,
555 "total_tokens": total_tokens,
556 "total_reasoning_tokens": total_reasoning_tokens,
557 "tokens_per_second": tokens_per_second,
558 "trace_id": output.trace_id,
559 "token_metrics_available": token_metrics_available,
560 "token_metrics_source": token_metrics_source,
561 "llm_nodes_without_usage": llm_nodes_without_usage,
562 })
563}
564
565fn apply_langfuse_nerdstats_attributes(
566 span: &mut dyn WorkflowSpan,
567 output: &YamlWorkflowRunOutput,
568 enabled: bool,
569) {
570 if !enabled {
571 return;
572 }
573
574 let nerdstats = workflow_nerdstats(output);
575 let nerdstats_json = nerdstats.to_string();
576 span.set_attribute("langfuse.trace.metadata.nerdstats", nerdstats_json.as_str());
577
578 span.set_attribute(
579 "langfuse.trace.metadata.nerdstats.workflow_id",
580 output.workflow_id.as_str(),
581 );
582 span.set_attribute(
583 "langfuse.trace.metadata.nerdstats.terminal_node",
584 output.terminal_node.as_str(),
585 );
586 span.set_attribute(
587 "langfuse.trace.metadata.nerdstats.total_elapsed_ms",
588 output.total_elapsed_ms.to_string().as_str(),
589 );
590 span.set_attribute(
591 "langfuse.trace.metadata.nerdstats.step_details_count",
592 output.step_timings.len().to_string().as_str(),
593 );
594 span.set_attribute(
595 "langfuse.trace.metadata.nerdstats.total_input_tokens",
596 output.total_input_tokens.to_string().as_str(),
597 );
598 span.set_attribute(
599 "langfuse.trace.metadata.nerdstats.total_output_tokens",
600 output.total_output_tokens.to_string().as_str(),
601 );
602 span.set_attribute(
603 "langfuse.trace.metadata.nerdstats.total_tokens",
604 output.total_tokens.to_string().as_str(),
605 );
606 span.set_attribute(
607 "langfuse.trace.metadata.nerdstats.tokens_per_second",
608 output.tokens_per_second.to_string().as_str(),
609 );
610
611 if let Some(ttft_ms) = output.ttft_ms {
612 span.set_attribute(
613 "langfuse.trace.metadata.nerdstats.ttft_ms",
614 ttft_ms.to_string().as_str(),
615 );
616 }
617
618 if let Some(reasoning_tokens) = output.total_reasoning_tokens {
619 span.set_attribute(
620 "langfuse.trace.metadata.nerdstats.total_reasoning_tokens",
621 reasoning_tokens.to_string().as_str(),
622 );
623 }
624
625 let llm_nodes_without_usage_count = output
626 .step_timings
627 .iter()
628 .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
629 .count();
630 let token_metrics_available = llm_nodes_without_usage_count == 0;
631 span.set_attribute(
632 "langfuse.trace.metadata.nerdstats.token_metrics_available",
633 if token_metrics_available {
634 "true"
635 } else {
636 "false"
637 },
638 );
639 span.set_attribute(
640 "langfuse.trace.metadata.nerdstats.token_metrics_source",
641 if token_metrics_available {
642 "provider_usage"
643 } else {
644 "provider_stream_usage_unavailable"
645 },
646 );
647 span.set_attribute(
648 "langfuse.trace.metadata.nerdstats.llm_nodes_without_usage_count",
649 llm_nodes_without_usage_count.to_string().as_str(),
650 );
651}
652
653fn apply_langfuse_trace_input_output_attributes(
654 span: &mut dyn WorkflowSpan,
655 workflow_input: &Value,
656 output: &YamlWorkflowRunOutput,
657 payload_mode: YamlWorkflowPayloadMode,
658) {
659 let trace_input = payload_for_span(payload_mode, workflow_input);
660 span.set_attribute("langfuse.trace.input", trace_input.as_str());
661
662 if let Some(terminal_output) = output.terminal_output.as_ref() {
663 let trace_output = payload_for_span(payload_mode, terminal_output);
664 span.set_attribute("langfuse.trace.output", trace_output.as_str());
665 }
666
667 let usage_details = json!({
668 "input": output.total_input_tokens,
669 "output": output.total_output_tokens,
670 "total": output.total_tokens,
671 "reasoning": output.total_reasoning_tokens,
672 })
673 .to_string();
674 span.set_attribute(
675 "langfuse.trace.metadata.usage_details",
676 usage_details.as_str(),
677 );
678}
679
680fn apply_langfuse_observation_usage_attributes(
681 span: &mut dyn WorkflowSpan,
682 usage: &YamlLlmTokenUsage,
683) {
684 let usage_details = json!({
685 "input": usage.prompt_tokens,
686 "output": usage.completion_tokens,
687 "total": usage.total_tokens,
688 "reasoning": usage.reasoning_tokens,
689 })
690 .to_string();
691 span.set_attribute("langfuse.observation.usage_details", usage_details.as_str());
692 span.set_attribute(
693 "gen_ai.usage.input_tokens",
694 usage.prompt_tokens.to_string().as_str(),
695 );
696 span.set_attribute(
697 "gen_ai.usage.output_tokens",
698 usage.completion_tokens.to_string().as_str(),
699 );
700 span.set_attribute(
701 "gen_ai.usage.total_tokens",
702 usage.total_tokens.to_string().as_str(),
703 );
704 if let Some(reasoning_tokens) = usage.reasoning_tokens {
705 span.set_attribute(
706 "gen_ai.usage.reasoning_tokens",
707 reasoning_tokens.to_string().as_str(),
708 );
709 }
710}
711
712fn payload_for_span(mode: YamlWorkflowPayloadMode, payload: &Value) -> String {
713 match mode {
714 YamlWorkflowPayloadMode::FullPayload => payload.to_string(),
715 YamlWorkflowPayloadMode::RedactedPayload => json!({
716 "redacted": true,
717 "value_type": match payload {
718 Value::Null => "null",
719 Value::Bool(_) => "bool",
720 Value::Number(_) => "number",
721 Value::String(_) => "string",
722 Value::Array(_) => "array",
723 Value::Object(_) => "object",
724 }
725 })
726 .to_string(),
727 }
728}
729
730fn payload_for_tool_trace(mode: YamlToolTraceMode, payload: &Value) -> Value {
731 match mode {
732 YamlToolTraceMode::Full => payload.clone(),
733 YamlToolTraceMode::Redacted => json!({
734 "redacted": true,
735 "value_type": json_type_name(payload),
736 }),
737 YamlToolTraceMode::Off => Value::Null,
738 }
739}
740
741fn validate_json_schema(schema: &Value) -> Result<(), String> {
742 jsonschema::JSONSchema::compile(schema)
743 .map(|_| ())
744 .map_err(|error| format!("invalid JSON schema: {error}"))
745}
746
747fn validate_schema_instance(schema: &Value, instance: &Value) -> Result<(), String> {
748 let validator = jsonschema::JSONSchema::compile(schema)
749 .map_err(|error| format!("invalid JSON schema: {error}"))?;
750 if let Err(errors) = validator.validate(instance) {
751 let message = errors
752 .into_iter()
753 .next()
754 .map(|error| error.to_string())
755 .unwrap_or_else(|| "unknown schema validation error".to_string());
756 return Err(format!("schema validation failed: {message}"));
757 }
758 Ok(())
759}
760
761fn schema_type(schema: &Value) -> Option<&str> {
762 schema.get("type").and_then(Value::as_str)
763}
764
765fn schema_expects_object(schema: &Value) -> bool {
766 schema_type(schema) == Some("object")
767}
768
769fn trace_context_from_options(options: &YamlWorkflowRunOptions) -> Option<TraceContext> {
770 options.trace.context.as_ref().map(|input| TraceContext {
771 trace_id: input.trace_id.clone(),
772 span_id: input.span_id.clone(),
773 parent_span_id: input.parent_span_id.clone(),
774 traceparent: input.traceparent.clone(),
775 tracestate: input.tracestate.clone(),
776 baggage: input.baggage.clone(),
777 })
778}
779
780fn merged_trace_context_for_worker(
781 span_context: Option<&TraceContext>,
782 resolved_trace_id: Option<&str>,
783 options: &YamlWorkflowRunOptions,
784) -> TraceContext {
785 let input_context = options.trace.context.as_ref();
786 let baggage = if let Some(context) = span_context {
787 if !context.baggage.is_empty() {
788 context.baggage.clone()
789 } else {
790 input_context
791 .map(|value| value.baggage.clone())
792 .unwrap_or_default()
793 }
794 } else {
795 input_context
796 .map(|value| value.baggage.clone())
797 .unwrap_or_default()
798 };
799
800 TraceContext {
801 trace_id: span_context
802 .and_then(|context| context.trace_id.clone())
803 .or_else(|| resolved_trace_id.map(|value| value.to_string()))
804 .or_else(|| input_context.and_then(|context| context.trace_id.clone())),
805 span_id: span_context
806 .and_then(|context| context.span_id.clone())
807 .or_else(|| input_context.and_then(|context| context.span_id.clone())),
808 parent_span_id: span_context
809 .and_then(|context| context.parent_span_id.clone())
810 .or_else(|| input_context.and_then(|context| context.parent_span_id.clone())),
811 traceparent: span_context
812 .and_then(|context| context.traceparent.clone())
813 .or_else(|| input_context.and_then(|context| context.traceparent.clone())),
814 tracestate: span_context
815 .and_then(|context| context.tracestate.clone())
816 .or_else(|| input_context.and_then(|context| context.tracestate.clone())),
817 baggage,
818 }
819}
820
821fn custom_worker_context_with_trace(
822 context: &Value,
823 trace_context: &TraceContext,
824 tenant_context: &YamlWorkflowTraceTenantContext,
825) -> Value {
826 let mut context_with_trace = context.clone();
827 let Some(root) = context_with_trace.as_object_mut() else {
828 return context_with_trace;
829 };
830
831 root.insert(
832 "trace".to_string(),
833 json!({
834 "context": {
835 "trace_id": trace_context.trace_id,
836 "span_id": trace_context.span_id,
837 "parent_span_id": trace_context.parent_span_id,
838 "traceparent": trace_context.traceparent,
839 "tracestate": trace_context.tracestate,
840 "baggage": trace_context.baggage,
841 },
842 "tenant": {
843 "workspace_id": tenant_context.workspace_id,
844 "user_id": tenant_context.user_id,
845 "conversation_id": tenant_context.conversation_id,
846 "request_id": tenant_context.request_id,
847 "run_id": tenant_context.run_id,
848 }
849 }),
850 );
851
852 context_with_trace
853}
854
855fn include_raw_stream_debug_events() -> bool {
856 match std::env::var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW") {
857 Ok(value) => {
858 let normalized = value.trim().to_ascii_lowercase();
859 normalized == "1" || normalized == "true" || normalized == "yes" || normalized == "on"
860 }
861 Err(_) => false,
862 }
863}
864
865#[derive(Debug)]
866struct StreamedPayloadResolution {
867 payload: Value,
868 heal_confidence: Option<f32>,
869}
870
871#[derive(Debug, Default)]
872struct StreamJsonAsTextFormatter {
873 raw_json: String,
874 emitted: bool,
875}
876
877impl StreamJsonAsTextFormatter {
878 fn push(&mut self, chunk: &str) {
879 self.raw_json.push_str(chunk);
880 }
881
882 fn emit_if_ready(&mut self, complete: bool) -> Option<String> {
883 if self.emitted || !complete {
884 return None;
885 }
886 self.emitted = true;
887 Some(render_json_object_as_text(self.raw_json.as_str()))
888 }
889}
890
891fn render_json_object_as_text(raw_json: &str) -> String {
892 let value = match serde_json::from_str::<Value>(raw_json) {
893 Ok(value) => value,
894 Err(_) => return raw_json.to_string(),
895 };
896 let Some(object) = value.as_object() else {
897 return raw_json.to_string();
898 };
899
900 let mut lines = Vec::with_capacity(object.len());
901 for (key, value) in object {
902 let rendered = match value {
903 Value::String(text) => text.clone(),
904 _ => value.to_string(),
905 };
906 lines.push(format!("{key}: {rendered}"));
907 }
908 lines.join("\n")
909}
910
911#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
912#[serde(rename_all = "snake_case")]
913pub enum YamlWorkflowTokenKind {
914 Output,
915 Thinking,
916}
917
918#[derive(Debug, Default)]
919struct StructuredJsonDeltaFilter {
920 started: bool,
921 completed: bool,
922 depth: u32,
923 in_string: bool,
924 escape: bool,
925}
926
927impl StructuredJsonDeltaFilter {
928 fn split(&mut self, delta: &str) -> (Option<String>, Option<String>) {
929 if delta.is_empty() {
930 return (None, None);
931 }
932
933 let mut output = String::new();
934 let mut thinking = String::new();
935
936 for ch in delta.chars() {
937 if self.completed {
938 thinking.push(ch);
939 continue;
940 }
941
942 if !self.started {
943 if ch != '{' {
944 thinking.push(ch);
945 continue;
946 }
947 self.started = true;
948 self.depth = 1;
949 output.push(ch);
950 continue;
951 }
952
953 output.push(ch);
954 if self.in_string {
955 if self.escape {
956 self.escape = false;
957 continue;
958 }
959 if ch == '\\' {
960 self.escape = true;
961 continue;
962 }
963 if ch == '"' {
964 self.in_string = false;
965 }
966 continue;
967 }
968
969 match ch {
970 '"' => self.in_string = true,
971 '{' => self.depth = self.depth.saturating_add(1),
972 '}' => {
973 if self.depth > 0 {
974 self.depth -= 1;
975 }
976 if self.depth == 0 {
977 self.completed = true;
978 }
979 }
980 _ => {}
981 }
982 }
983
984 let output = if output.is_empty() {
985 None
986 } else {
987 Some(output)
988 };
989 let thinking = if thinking.is_empty() {
990 None
991 } else {
992 Some(thinking)
993 };
994
995 (output, thinking)
996 }
997
998 fn completed(&self) -> bool {
999 self.completed
1000 }
1001}
1002
1003fn extract_last_fenced_json_block(raw: &str) -> Option<&str> {
1004 let start = raw.rfind("```json")?;
1005 let remainder = &raw[start + "```json".len()..];
1006 let end = remainder.find("```")?;
1007 let candidate = remainder[..end].trim();
1008 if candidate.is_empty() {
1009 return None;
1010 }
1011 Some(candidate)
1012}
1013
1014fn extract_balanced_object_from(raw: &str, start_index: usize) -> Option<&str> {
1015 let mut depth = 0u32;
1016 let mut in_string = false;
1017 let mut escape = false;
1018
1019 for (relative_index, ch) in raw[start_index..].char_indices() {
1020 if in_string {
1021 if escape {
1022 escape = false;
1023 continue;
1024 }
1025 if ch == '\\' {
1026 escape = true;
1027 continue;
1028 }
1029 if ch == '"' {
1030 in_string = false;
1031 }
1032 continue;
1033 }
1034
1035 match ch {
1036 '"' => in_string = true,
1037 '{' => depth = depth.saturating_add(1),
1038 '}' => {
1039 if depth == 0 {
1040 return None;
1041 }
1042 depth -= 1;
1043 if depth == 0 {
1044 let end_index = start_index + relative_index + ch.len_utf8();
1045 return Some(raw[start_index..end_index].trim());
1046 }
1047 }
1048 _ => {}
1049 }
1050 }
1051
1052 None
1053}
1054
1055fn extract_last_parsable_object(raw: &str) -> Option<&str> {
1056 let starts: Vec<usize> = raw
1057 .char_indices()
1058 .filter_map(|(index, ch)| if ch == '{' { Some(index) } else { None })
1059 .collect();
1060
1061 for start in starts.into_iter().rev() {
1062 let Some(candidate) = extract_balanced_object_from(raw, start) else {
1063 continue;
1064 };
1065 if serde_json::from_str::<Value>(candidate).is_ok() {
1066 return Some(candidate);
1067 }
1068 }
1069
1070 None
1071}
1072
1073fn resolve_structured_json_candidate(raw: &str) -> Option<&str> {
1074 extract_last_fenced_json_block(raw).or_else(|| extract_last_parsable_object(raw))
1075}
1076
1077fn parse_streamed_structured_payload(
1078 raw: &str,
1079 heal: bool,
1080) -> Result<StreamedPayloadResolution, String> {
1081 if !heal {
1082 if let Ok(payload) = serde_json::from_str::<Value>(raw) {
1083 return Ok(StreamedPayloadResolution {
1084 payload,
1085 heal_confidence: None,
1086 });
1087 }
1088
1089 let candidate = resolve_structured_json_candidate(raw).ok_or_else(|| {
1090 "failed to parse streamed structured completion JSON: no JSON object candidate found"
1091 .to_string()
1092 })?;
1093 let payload = serde_json::from_str::<Value>(candidate).map_err(|error| {
1094 format!(
1095 "failed to parse streamed structured completion JSON: {error}; candidate={candidate}"
1096 )
1097 })?;
1098 return Ok(StreamedPayloadResolution {
1099 payload,
1100 heal_confidence: None,
1101 });
1102 }
1103
1104 let candidate = resolve_structured_json_candidate(raw).unwrap_or(raw);
1105 let parser = JsonishParser::new();
1106 let healed = parser
1107 .parse(candidate)
1108 .map_err(|error| format!("failed to heal streamed structured completion JSON: {error}"))?;
1109
1110 Ok(StreamedPayloadResolution {
1111 payload: healed.value,
1112 heal_confidence: Some(healed.confidence),
1113 })
1114}
1115
1116#[derive(Debug, Clone, PartialEq, Serialize)]
1117pub struct YamlWorkflowEvent {
1118 pub event_type: String,
1119 pub node_id: Option<String>,
1120 pub step_id: Option<String>,
1121 pub node_kind: Option<String>,
1122 pub streamable: Option<bool>,
1123 pub message: Option<String>,
1124 pub delta: Option<String>,
1125 pub token_kind: Option<YamlWorkflowTokenKind>,
1126 pub is_terminal_node_token: Option<bool>,
1127 pub elapsed_ms: Option<u128>,
1128 pub metadata: Option<Value>,
1129}
1130
1131pub type WorkflowMessageRole = Role;
1132
1133#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1134pub struct WorkflowMessage {
1135 pub role: WorkflowMessageRole,
1136 pub content: String,
1137 #[serde(default)]
1138 pub name: Option<String>,
1139 #[serde(default, alias = "toolCallId")]
1140 pub tool_call_id: Option<String>,
1141}
1142
1143#[derive(Debug, Clone, PartialEq, Serialize)]
1144pub struct YamlTemplateBinding {
1145 pub index: usize,
1146 pub expression: String,
1147 pub source_path: String,
1148 pub resolved: Value,
1149 pub resolved_type: String,
1150 pub missing: bool,
1151}
1152
1153#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1154pub enum YamlWorkflowDiagnosticSeverity {
1155 Error,
1156 Warning,
1157}
1158
1159#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1160pub struct YamlWorkflowDiagnostic {
1161 pub node_id: Option<String>,
1162 pub code: String,
1163 pub severity: YamlWorkflowDiagnosticSeverity,
1164 pub message: String,
1165}
1166
1167#[derive(Debug, Error)]
1168pub enum YamlWorkflowRunError {
1169 #[error("failed to read workflow yaml '{path}': {source}")]
1170 Read {
1171 path: String,
1172 source: std::io::Error,
1173 },
1174 #[error("failed to parse workflow yaml '{path}': {source}")]
1175 Parse {
1176 path: String,
1177 source: serde_yaml::Error,
1178 },
1179 #[error("workflow '{workflow_id}' has no nodes")]
1180 EmptyNodes { workflow_id: String },
1181 #[error("entry node '{entry_node}' does not exist")]
1182 MissingEntry { entry_node: String },
1183 #[error("unknown node id '{node_id}'")]
1184 MissingNode { node_id: String },
1185 #[error("unsupported node type in '{node_id}'")]
1186 UnsupportedNodeType { node_id: String },
1187 #[error("unsupported switch condition format: {condition}")]
1188 UnsupportedCondition { condition: String },
1189 #[error("switch node '{node_id}' has no valid next target")]
1190 InvalidSwitchTarget { node_id: String },
1191 #[error("llm returned non-object payload for node '{node_id}'")]
1192 LlmPayloadNotObject { node_id: String },
1193 #[error("custom worker handler '{handler}' is not supported")]
1194 UnsupportedCustomHandler { handler: String },
1195 #[error("llm execution failed for node '{node_id}': {message}")]
1196 Llm { node_id: String, message: String },
1197 #[error("custom worker execution failed for node '{node_id}': {message}")]
1198 CustomWorker { node_id: String, message: String },
1199 #[error("workflow validation failed with {diagnostics_count} error(s)")]
1200 Validation {
1201 diagnostics_count: usize,
1202 diagnostics: Vec<YamlWorkflowDiagnostic>,
1203 },
1204 #[error("invalid workflow input: {message}")]
1205 InvalidInput { message: String },
1206 #[error("ir runtime execution failed: {message}")]
1207 IrRuntime { message: String },
1208 #[error("workflow event stream cancelled: {message}")]
1209 EventSinkCancelled { message: String },
1210}
1211
1212pub trait YamlWorkflowEventSink: Send + Sync {
1213 fn emit(&self, event: &YamlWorkflowEvent);
1214
1215 fn is_cancelled(&self) -> bool {
1216 false
1217 }
1218}
1219
1220pub struct NoopYamlWorkflowEventSink;
1221
1222impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
1223 fn emit(&self, _event: &YamlWorkflowEvent) {}
1224}
1225
1226fn workflow_event_sink_cancelled_message() -> &'static str {
1227 "workflow event callback cancelled"
1228}
1229
1230fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
1231 event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
1232}
1233
1234#[derive(Debug, Clone, PartialEq, Eq, Error)]
1235pub enum YamlToIrError {
1236 #[error("entry node '{entry_node}' does not exist")]
1237 MissingEntry { entry_node: String },
1238 #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
1239 MultipleOutgoingEdge { node_id: String },
1240 #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
1241 UnsupportedNode { node_id: String, reason: String },
1242}
1243
1244pub fn yaml_workflow_to_mermaid(workflow: &YamlWorkflow) -> String {
1246 if let Ok(ir) = yaml_workflow_to_ir(workflow) {
1247 return workflow_to_mermaid(&ir);
1248 }
1249
1250 yaml_workflow_to_mermaid_fallback(workflow)
1251}
1252
1253fn yaml_workflow_to_mermaid_fallback(workflow: &YamlWorkflow) -> String {
1254 let mut lines = Vec::new();
1255 lines.push("flowchart TD".to_string());
1256 let mut tool_node_ids: Vec<String> = Vec::new();
1257
1258 for node in &workflow.nodes {
1259 let label = format!("{}\\n({})", node.id, node.kind_name());
1260 lines.push(format!(
1261 " {}[\"{}\"]",
1262 sanitize_mermaid_id(&node.id),
1263 escape_mermaid_label(label.as_str()),
1264 ));
1265
1266 if let Some(llm) = node.node_type.llm_call.as_ref() {
1267 for (idx, tool_name) in llm_tool_names(llm).into_iter().enumerate() {
1268 let tool_id = sanitize_mermaid_id(format!("{}__tool_{}", node.id, idx).as_str());
1269 lines.push(format!(
1270 " {}([\"{}\"])",
1271 tool_id,
1272 escape_mermaid_label(format!("tool: {tool_name}").as_str())
1273 ));
1274 lines.push(format!(
1275 " {} -.-> {}",
1276 sanitize_mermaid_id(&node.id),
1277 tool_id
1278 ));
1279 tool_node_ids.push(tool_id);
1280 }
1281 }
1282 }
1283
1284 let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1285
1286 for edge in &workflow.edges {
1287 emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1288 }
1289
1290 for node in &workflow.nodes {
1291 if let Some(switch) = node.node_type.switch.as_ref() {
1292 for branch in &switch.branches {
1293 emitted.insert((
1294 node.id.clone(),
1295 branch.condition.clone(),
1296 branch.target.clone(),
1297 ));
1298 }
1299 emitted.insert((
1300 node.id.clone(),
1301 "default".to_string(),
1302 switch.default.clone(),
1303 ));
1304 }
1305 }
1306
1307 let mut edges = emitted.into_iter().collect::<Vec<_>>();
1308 edges.sort();
1309
1310 for (from, label, to) in edges {
1311 if label.is_empty() {
1312 lines.push(format!(
1313 " {} --> {}",
1314 sanitize_mermaid_id(&from),
1315 sanitize_mermaid_id(&to)
1316 ));
1317 } else {
1318 lines.push(format!(
1319 " {} -- \"{}\" --> {}",
1320 sanitize_mermaid_id(&from),
1321 escape_mermaid_label(&label),
1322 sanitize_mermaid_id(&to)
1323 ));
1324 }
1325 }
1326
1327 if !tool_node_ids.is_empty() {
1328 lines.push(" classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1329 lines.push(format!(" class {} toolNode;", tool_node_ids.join(",")));
1330 }
1331
1332 lines.join("\n")
1333}
1334
1335fn llm_tool_names(llm: &YamlLlmCall) -> Vec<String> {
1336 llm.tools
1337 .iter()
1338 .map(|tool| match tool {
1339 YamlToolDeclaration::OpenAi(openai) => openai.function.name.clone(),
1340 YamlToolDeclaration::Simplified(simple) => simple.name.clone(),
1341 })
1342 .collect()
1343}
1344
1345pub fn yaml_workflow_file_to_mermaid(workflow_path: &Path) -> Result<String, YamlWorkflowRunError> {
1347 let contents =
1348 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1349 path: workflow_path.display().to_string(),
1350 source,
1351 })?;
1352
1353 let workflow: YamlWorkflow =
1354 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1355 path: workflow_path.display().to_string(),
1356 source,
1357 })?;
1358
1359 let referenced_subgraphs = discover_referenced_subgraphs(workflow_path, &workflow)?;
1360 if referenced_subgraphs.is_empty() {
1361 return Ok(yaml_workflow_to_mermaid(&workflow));
1362 }
1363
1364 Ok(yaml_workflow_to_mermaid_with_subgraphs(
1365 &workflow,
1366 &referenced_subgraphs,
1367 ))
1368}
1369
1370#[derive(Debug, Clone)]
1371struct MermaidSubgraphWorkflow {
1372 alias: String,
1373 workflow: YamlWorkflow,
1374}
1375
1376#[derive(Debug, Default)]
1377struct MermaidBlockRender {
1378 lines: Vec<String>,
1379 tool_node_ids: Vec<String>,
1380 run_workflow_tool_node_ids: Vec<String>,
1381 entry_node_id: String,
1382}
1383
1384fn yaml_workflow_to_mermaid_with_subgraphs(
1385 workflow: &YamlWorkflow,
1386 subgraphs: &[MermaidSubgraphWorkflow],
1387) -> String {
1388 let mut lines = vec!["flowchart TD".to_string()];
1389
1390 let main_block = render_mermaid_block(workflow, "main");
1391 lines.push(format!(
1392 " subgraph main_graph[\"Main: {}\"]",
1393 escape_mermaid_label(&workflow.id)
1394 ));
1395 for line in &main_block.lines {
1396 lines.push(format!(" {line}"));
1397 }
1398 lines.push(" end".to_string());
1399
1400 let mut all_tool_nodes = main_block.tool_node_ids.clone();
1401
1402 for (index, subgraph) in subgraphs.iter().enumerate() {
1403 let block_id = format!("subgraph_{}", index + 1);
1404 let block = render_mermaid_block(&subgraph.workflow, &block_id);
1405 lines.push(format!(
1406 " subgraph {}[\"Subgraph: {}\"]",
1407 sanitize_mermaid_id(&format!("{}_cluster", block_id)),
1408 escape_mermaid_label(&subgraph.alias)
1409 ));
1410 for line in &block.lines {
1411 lines.push(format!(" {line}"));
1412 }
1413 lines.push(" end".to_string());
1414
1415 for tool_node in &main_block.run_workflow_tool_node_ids {
1416 lines.push(format!(
1417 " {} -. \"{}\" .-> {}",
1418 tool_node,
1419 escape_mermaid_label(&format!("calls {}", subgraph.alias)),
1420 block.entry_node_id
1421 ));
1422 }
1423
1424 all_tool_nodes.extend(block.tool_node_ids);
1425 }
1426
1427 if !all_tool_nodes.is_empty() {
1428 lines.push(" classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1429 lines.push(format!(" class {} toolNode;", all_tool_nodes.join(",")));
1430 }
1431
1432 lines.join("\n")
1433}
1434
1435fn render_mermaid_block(workflow: &YamlWorkflow, prefix: &str) -> MermaidBlockRender {
1436 let mut block = MermaidBlockRender {
1437 entry_node_id: prefixed_mermaid_id(prefix, &workflow.entry_node),
1438 ..Default::default()
1439 };
1440
1441 for node in &workflow.nodes {
1442 let node_id = prefixed_mermaid_id(prefix, &node.id);
1443 let label = format!("{}\\n({})", node.id, node.kind_name());
1444 block
1445 .lines
1446 .push(format!("{}[\"{}\"]", node_id, escape_mermaid_label(&label)));
1447
1448 if let Some(llm) = node.node_type.llm_call.as_ref() {
1449 for (idx, tool) in llm.tools.iter().enumerate() {
1450 let tool_name = tool_declaration_name(tool);
1451 let tool_id = prefixed_mermaid_id(prefix, &format!("{}__tool_{}", node.id, idx));
1452 block.lines.push(format!(
1453 "{}([\"{}\"])",
1454 tool_id,
1455 escape_mermaid_label(format!("tool: {tool_name}").as_str())
1456 ));
1457 block.lines.push(format!("{} -.-> {}", node_id, tool_id));
1458 block.tool_node_ids.push(tool_id.clone());
1459
1460 if tool_name == "run_workflow_graph" {
1461 block.run_workflow_tool_node_ids.push(tool_id);
1462 }
1463 }
1464 }
1465 }
1466
1467 let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1468
1469 for edge in &workflow.edges {
1470 emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1471 }
1472
1473 for node in &workflow.nodes {
1474 if let Some(switch) = node.node_type.switch.as_ref() {
1475 for branch in &switch.branches {
1476 emitted.insert((
1477 node.id.clone(),
1478 branch.condition.clone(),
1479 branch.target.clone(),
1480 ));
1481 }
1482 emitted.insert((
1483 node.id.clone(),
1484 "default".to_string(),
1485 switch.default.clone(),
1486 ));
1487 }
1488 }
1489
1490 let mut edges = emitted.into_iter().collect::<Vec<_>>();
1491 edges.sort();
1492
1493 for (from, label, to) in edges {
1494 let from_id = prefixed_mermaid_id(prefix, &from);
1495 let to_id = prefixed_mermaid_id(prefix, &to);
1496 if label.is_empty() {
1497 block.lines.push(format!("{} --> {}", from_id, to_id));
1498 } else {
1499 block.lines.push(format!(
1500 "{} -- \"{}\" --> {}",
1501 from_id,
1502 escape_mermaid_label(&label),
1503 to_id
1504 ));
1505 }
1506 }
1507
1508 block
1509}
1510
1511fn discover_referenced_subgraphs(
1512 workflow_path: &Path,
1513 workflow: &YamlWorkflow,
1514) -> Result<Vec<MermaidSubgraphWorkflow>, YamlWorkflowRunError> {
1515 let workflow_ids = referenced_workflow_ids(workflow);
1516 if workflow_ids.is_empty() {
1517 return Ok(Vec::new());
1518 }
1519
1520 let parent_dir = workflow_path.parent().unwrap_or(Path::new("."));
1521 let sibling_workflows = load_yaml_sibling_workflows(parent_dir, workflow_path)?;
1522
1523 let mut discovered = Vec::new();
1524 let mut seen = HashSet::new();
1525
1526 for workflow_id in workflow_ids {
1527 let normalized = normalize_workflow_lookup_key(&workflow_id);
1528 if seen.contains(&normalized) {
1529 continue;
1530 }
1531
1532 if let Some((_, subworkflow)) = sibling_workflows.iter().find(|(key, _)| key == &normalized)
1533 {
1534 discovered.push(MermaidSubgraphWorkflow {
1535 alias: workflow_id.clone(),
1536 workflow: subworkflow.clone(),
1537 });
1538 seen.insert(normalized);
1539 }
1540 }
1541
1542 Ok(discovered)
1543}
1544
1545fn load_yaml_sibling_workflows(
1546 parent_dir: &Path,
1547 workflow_path: &Path,
1548) -> Result<Vec<(String, YamlWorkflow)>, YamlWorkflowRunError> {
1549 let mut results = Vec::new();
1550 let entries = std::fs::read_dir(parent_dir).map_err(|source| YamlWorkflowRunError::Read {
1551 path: parent_dir.display().to_string(),
1552 source,
1553 })?;
1554
1555 for entry in entries {
1556 let entry = entry.map_err(|source| YamlWorkflowRunError::Read {
1557 path: parent_dir.display().to_string(),
1558 source,
1559 })?;
1560 let path = entry.path();
1561 if !is_yaml_file(&path) {
1562 continue;
1563 }
1564
1565 if path == workflow_path {
1566 continue;
1567 }
1568
1569 let contents =
1570 std::fs::read_to_string(&path).map_err(|source| YamlWorkflowRunError::Read {
1571 path: path.display().to_string(),
1572 source,
1573 })?;
1574 let subworkflow: YamlWorkflow =
1575 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1576 path: path.display().to_string(),
1577 source,
1578 })?;
1579
1580 if let Some(stem) = path.file_stem().and_then(|value| value.to_str()) {
1581 results.push((normalize_workflow_lookup_key(stem), subworkflow.clone()));
1582 }
1583 results.push((normalize_workflow_lookup_key(&subworkflow.id), subworkflow));
1584 }
1585
1586 Ok(results)
1587}
1588
1589fn referenced_workflow_ids(workflow: &YamlWorkflow) -> Vec<String> {
1590 let mut ids = Vec::new();
1591 let mut seen = HashSet::new();
1592
1593 for node in &workflow.nodes {
1594 let prompt = node
1595 .config
1596 .as_ref()
1597 .and_then(|config| config.prompt.as_deref());
1598
1599 if let Some(llm) = node.node_type.llm_call.as_ref() {
1600 for tool in &llm.tools {
1601 if tool_declaration_name(tool) != "run_workflow_graph" {
1602 continue;
1603 }
1604
1605 for workflow_id in referenced_workflow_ids_from_tool(tool) {
1606 let normalized = normalize_workflow_lookup_key(&workflow_id);
1607 if seen.insert(normalized) {
1608 ids.push(workflow_id);
1609 }
1610 }
1611
1612 if let Some(prompt_text) = prompt {
1613 for workflow_id in referenced_workflow_ids_from_prompt(prompt_text) {
1614 let normalized = normalize_workflow_lookup_key(&workflow_id);
1615 if seen.insert(normalized) {
1616 ids.push(workflow_id);
1617 }
1618 }
1619 }
1620 }
1621 }
1622 }
1623
1624 ids
1625}
1626
1627fn referenced_workflow_ids_from_tool(tool: &YamlToolDeclaration) -> Vec<String> {
1628 let schema = match tool {
1629 YamlToolDeclaration::OpenAi(openai) => openai.function.parameters.as_ref(),
1630 YamlToolDeclaration::Simplified(simple) => Some(&simple.input_schema),
1631 };
1632
1633 let mut ids = Vec::new();
1634 if let Some(schema) = schema {
1635 if let Some(workflow_prop) = schema
1636 .get("properties")
1637 .and_then(Value::as_object)
1638 .and_then(|properties| properties.get("workflow_id"))
1639 {
1640 if let Some(value) = workflow_prop.get("const").and_then(Value::as_str) {
1641 ids.push(value.to_string());
1642 }
1643
1644 if let Some(enum_values) = workflow_prop.get("enum").and_then(Value::as_array) {
1645 for value in enum_values {
1646 if let Some(value) = value.as_str() {
1647 ids.push(value.to_string());
1648 }
1649 }
1650 }
1651 }
1652 }
1653
1654 ids
1655}
1656
1657fn referenced_workflow_ids_from_prompt(prompt: &str) -> Vec<String> {
1658 let mut ids = Vec::new();
1659 let mut search = prompt;
1660
1661 while let Some(index) = search.find("\"workflow_id\"") {
1662 let remainder = &search[index + "\"workflow_id\"".len()..];
1663 let Some(colon_index) = remainder.find(':') else {
1664 break;
1665 };
1666
1667 let candidate = remainder[colon_index + 1..].trim_start();
1668 if let Some(rest) = candidate.strip_prefix('"') {
1669 if let Some(end_quote_index) = rest.find('"') {
1670 let workflow_id = rest[..end_quote_index].trim();
1671 if !workflow_id.is_empty() {
1672 ids.push(workflow_id.to_string());
1673 }
1674 search = &rest[end_quote_index + 1..];
1675 continue;
1676 }
1677 }
1678
1679 search = &remainder[colon_index + 1..];
1680 }
1681
1682 ids
1683}
1684
1685fn normalize_workflow_lookup_key(value: &str) -> String {
1686 value
1687 .chars()
1688 .map(|ch| match ch {
1689 '-' => '_',
1690 _ => ch.to_ascii_lowercase(),
1691 })
1692 .collect()
1693}
1694
1695fn is_yaml_file(path: &Path) -> bool {
1696 matches!(
1697 path.extension().and_then(|ext| ext.to_str()),
1698 Some("yaml") | Some("yml")
1699 )
1700}
1701
1702fn prefixed_mermaid_id(prefix: &str, id: &str) -> String {
1703 sanitize_mermaid_id(format!("{}__{}", prefix, id).as_str())
1704}
1705
1706fn tool_declaration_name(tool: &YamlToolDeclaration) -> &str {
1707 match tool {
1708 YamlToolDeclaration::OpenAi(openai) => &openai.function.name,
1709 YamlToolDeclaration::Simplified(simple) => &simple.name,
1710 }
1711}
1712
1713pub fn yaml_workflow_to_ir(workflow: &YamlWorkflow) -> Result<WorkflowDefinition, YamlToIrError> {
1714 let known_ids: HashSet<&str> = workflow.nodes.iter().map(|n| n.id.as_str()).collect();
1715 if !known_ids.contains(workflow.entry_node.as_str()) {
1716 return Err(YamlToIrError::MissingEntry {
1717 entry_node: workflow.entry_node.clone(),
1718 });
1719 }
1720
1721 let mut outgoing: HashMap<&str, Vec<&str>> = HashMap::new();
1722 for edge in &workflow.edges {
1723 outgoing
1724 .entry(edge.from.as_str())
1725 .or_default()
1726 .push(edge.to.as_str());
1727 }
1728
1729 let mut nodes = Vec::with_capacity(workflow.nodes.len() + 1);
1730 nodes.push(Node {
1731 id: YAML_START_NODE_ID.to_string(),
1732 kind: NodeKind::Start {
1733 next: workflow.entry_node.clone(),
1734 },
1735 });
1736
1737 for node in &workflow.nodes {
1738 if let Some(llm) = node.node_type.llm_call.as_ref() {
1739 if node
1740 .config
1741 .as_ref()
1742 .and_then(|c| c.set_globals.as_ref())
1743 .is_some()
1744 || node
1745 .config
1746 .as_ref()
1747 .and_then(|c| c.update_globals.as_ref())
1748 .is_some()
1749 {
1750 return Err(YamlToIrError::UnsupportedNode {
1751 node_id: node.id.clone(),
1752 reason: "set_globals/update_globals are not represented in canonical IR llm nodes yet"
1753 .to_string(),
1754 });
1755 }
1756
1757 if !llm.tools.is_empty() {
1758 return Err(YamlToIrError::UnsupportedNode {
1759 node_id: node.id.clone(),
1760 reason: "llm_call.tools are not represented in canonical IR llm nodes yet"
1761 .to_string(),
1762 });
1763 }
1764
1765 let next = single_next_for_node(&outgoing, &node.id)?;
1766 nodes.push(Node {
1767 id: node.id.clone(),
1768 kind: NodeKind::Tool {
1769 tool: YAML_LLM_TOOL_ID.to_string(),
1770 input: json!({
1771 "node_id": node.id,
1772 "model": llm.model,
1773 "prompt_template": node
1774 .config
1775 .as_ref()
1776 .and_then(|c| c.prompt.clone())
1777 .unwrap_or_default(),
1778 "stream": llm.stream.unwrap_or(false),
1779 "stream_json_as_text": llm.stream_json_as_text.unwrap_or(false),
1780 "heal": llm.heal.unwrap_or(false),
1781 "messages_path": llm.messages_path,
1782 "append_prompt_as_user": llm.append_prompt_as_user.unwrap_or(true),
1783 "output_schema": node
1784 .config
1785 .as_ref()
1786 .and_then(|c| c.output_schema.clone())
1787 .unwrap_or_else(default_llm_output_schema),
1788 }),
1789 next,
1790 },
1791 });
1792 continue;
1793 }
1794
1795 if let Some(worker) = node.node_type.custom_worker.as_ref() {
1796 if node
1797 .config
1798 .as_ref()
1799 .and_then(|c| c.set_globals.as_ref())
1800 .is_some()
1801 || node
1802 .config
1803 .as_ref()
1804 .and_then(|c| c.update_globals.as_ref())
1805 .is_some()
1806 {
1807 return Err(YamlToIrError::UnsupportedNode {
1808 node_id: node.id.clone(),
1809 reason: "set_globals/update_globals are not represented in canonical IR tool nodes yet"
1810 .to_string(),
1811 });
1812 }
1813
1814 let next = single_next_for_node(&outgoing, &node.id)?;
1815 nodes.push(Node {
1816 id: node.id.clone(),
1817 kind: NodeKind::Tool {
1818 tool: worker.handler.clone(),
1819 input: node
1820 .config
1821 .as_ref()
1822 .and_then(|c| c.payload.clone())
1823 .unwrap_or_else(|| json!({})),
1824 next,
1825 },
1826 });
1827 continue;
1828 }
1829
1830 if let Some(switch) = node.node_type.switch.as_ref() {
1831 nodes.push(Node {
1832 id: node.id.clone(),
1833 kind: NodeKind::Router {
1834 routes: switch
1835 .branches
1836 .iter()
1837 .map(|b| RouterRoute {
1838 when: rewrite_yaml_condition_to_ir(&b.condition),
1839 next: b.target.clone(),
1840 })
1841 .collect(),
1842 default: switch.default.clone(),
1843 },
1844 });
1845 continue;
1846 }
1847
1848 return Err(YamlToIrError::UnsupportedNode {
1849 node_id: node.id.clone(),
1850 reason: "node_type must be llm_call, switch, or custom_worker".to_string(),
1851 });
1852 }
1853
1854 Ok(WorkflowDefinition {
1855 version: WORKFLOW_IR_V0.to_string(),
1856 name: workflow.id.clone(),
1857 nodes,
1858 })
1859}
1860
1861fn single_next_for_node(
1862 outgoing: &HashMap<&str, Vec<&str>>,
1863 node_id: &str,
1864) -> Result<Option<String>, YamlToIrError> {
1865 match outgoing.get(node_id) {
1866 None => Ok(None),
1867 Some(targets) if targets.len() == 1 => Ok(Some(targets[0].to_string())),
1868 Some(_) => Err(YamlToIrError::MultipleOutgoingEdge {
1869 node_id: node_id.to_string(),
1870 }),
1871 }
1872}
1873
1874fn rewrite_yaml_condition_to_ir(expr: &str) -> String {
1875 let rewritten = expr
1876 .replace("$.nodes.", "$.node_outputs.")
1877 .replace(".output.", ".");
1878 if let Some(prefix) = rewritten.strip_suffix(".output") {
1879 prefix.to_string()
1880 } else {
1881 rewritten
1882 }
1883}
1884
1885fn sanitize_mermaid_id(id: &str) -> String {
1886 let mut out = String::with_capacity(id.len() + 1);
1887 if id
1888 .chars()
1889 .next()
1890 .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_')
1891 {
1892 out.push_str(id);
1893 } else {
1894 out.push('n');
1895 out.push('_');
1896 out.push_str(id);
1897 }
1898 out.chars()
1899 .map(|ch| {
1900 if ch.is_ascii_alphanumeric() || ch == '_' {
1901 ch
1902 } else {
1903 '_'
1904 }
1905 })
1906 .collect()
1907}
1908
1909fn escape_mermaid_label(label: &str) -> String {
1910 label.replace('"', "\\\"")
1911}
1912
1913#[derive(Debug, Clone)]
1914pub struct YamlLlmExecutionRequest {
1915 pub node_id: String,
1916 pub is_terminal_node: bool,
1917 pub stream_json_as_text: bool,
1918 pub model: String,
1919 pub messages: Option<Vec<Message>>,
1920 pub append_prompt_as_user: bool,
1921 pub prompt: String,
1922 pub prompt_template: String,
1923 pub prompt_bindings: Vec<YamlTemplateBinding>,
1924 pub schema: Value,
1925 pub stream: bool,
1926 pub heal: bool,
1927 pub tools: Vec<YamlResolvedTool>,
1928 pub tool_choice: Option<ToolChoice>,
1929 pub max_tool_roundtrips: u8,
1930 pub tool_calls_global_key: Option<String>,
1931 pub tool_trace_mode: YamlToolTraceMode,
1932 pub execution_context: Value,
1933 pub email_text: String,
1934 pub trace_id: Option<String>,
1935 pub trace_context: Option<TraceContext>,
1936 pub tenant_context: YamlWorkflowTraceTenantContext,
1937 pub trace_sampled: bool,
1938}
1939
1940#[derive(Debug, Clone)]
1941pub struct YamlResolvedTool {
1942 pub definition: ToolDefinition,
1943 pub output_schema: Option<Value>,
1944}
1945
1946#[async_trait]
1947pub trait YamlWorkflowLlmExecutor: Send + Sync {
1948 async fn complete_structured(
1949 &self,
1950 request: YamlLlmExecutionRequest,
1951 event_sink: Option<&dyn YamlWorkflowEventSink>,
1952 ) -> Result<YamlLlmExecutionResult, String>;
1953}
1954
1955#[async_trait]
1956pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
1957 async fn execute(
1958 &self,
1959 handler: &str,
1960 payload: &Value,
1961 email_text: &str,
1962 context: &Value,
1963 ) -> Result<Value, String>;
1964}
1965
1966pub async fn run_workflow_yaml_file(
1967 workflow_path: &Path,
1968 workflow_input: &Value,
1969 executor: &dyn YamlWorkflowLlmExecutor,
1970) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1971 let contents =
1972 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1973 path: workflow_path.display().to_string(),
1974 source,
1975 })?;
1976
1977 let workflow: YamlWorkflow =
1978 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1979 path: workflow_path.display().to_string(),
1980 source,
1981 })?;
1982
1983 run_workflow_yaml(&workflow, workflow_input, executor).await
1984}
1985
1986pub async fn run_email_workflow_yaml_file(
1987 workflow_path: &Path,
1988 email_text: &str,
1989 executor: &dyn YamlWorkflowLlmExecutor,
1990) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1991 let workflow_input = json!({ "email_text": email_text });
1992 run_workflow_yaml_file(workflow_path, &workflow_input, executor).await
1993}
1994
1995pub async fn run_workflow_yaml_file_with_client(
1996 workflow_path: &Path,
1997 workflow_input: &Value,
1998 client: &SimpleAgentsClient,
1999) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2000 let contents =
2001 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
2002 path: workflow_path.display().to_string(),
2003 source,
2004 })?;
2005
2006 let workflow: YamlWorkflow =
2007 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
2008 path: workflow_path.display().to_string(),
2009 source,
2010 })?;
2011
2012 run_workflow_yaml_with_client(&workflow, workflow_input, client).await
2013}
2014
2015pub async fn run_email_workflow_yaml_file_with_client(
2016 workflow_path: &Path,
2017 email_text: &str,
2018 client: &SimpleAgentsClient,
2019) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2020 let workflow_input = json!({ "email_text": email_text });
2021 run_workflow_yaml_file_with_client(workflow_path, &workflow_input, client).await
2022}
2023
2024pub async fn run_workflow_yaml_with_client(
2025 workflow: &YamlWorkflow,
2026 workflow_input: &Value,
2027 client: &SimpleAgentsClient,
2028) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2029 run_workflow_yaml_with_client_and_custom_worker(workflow, workflow_input, client, None).await
2030}
2031
2032pub async fn run_email_workflow_yaml_with_client(
2033 workflow: &YamlWorkflow,
2034 email_text: &str,
2035 client: &SimpleAgentsClient,
2036) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2037 let workflow_input = json!({ "email_text": email_text });
2038 run_workflow_yaml_with_client(workflow, &workflow_input, client).await
2039}
2040
2041pub async fn run_workflow_yaml_file_with_client_and_custom_worker(
2042 workflow_path: &Path,
2043 workflow_input: &Value,
2044 client: &SimpleAgentsClient,
2045 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2046) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2047 let contents =
2048 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
2049 path: workflow_path.display().to_string(),
2050 source,
2051 })?;
2052
2053 let workflow: YamlWorkflow =
2054 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
2055 path: workflow_path.display().to_string(),
2056 source,
2057 })?;
2058
2059 run_workflow_yaml_with_client_and_custom_worker(
2060 &workflow,
2061 workflow_input,
2062 client,
2063 custom_worker,
2064 )
2065 .await
2066}
2067
2068pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker(
2069 workflow_path: &Path,
2070 email_text: &str,
2071 client: &SimpleAgentsClient,
2072 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2073) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2074 let workflow_input = json!({ "email_text": email_text });
2075 run_workflow_yaml_file_with_client_and_custom_worker(
2076 workflow_path,
2077 &workflow_input,
2078 client,
2079 custom_worker,
2080 )
2081 .await
2082}
2083
2084pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events(
2085 workflow_path: &Path,
2086 workflow_input: &Value,
2087 client: &SimpleAgentsClient,
2088 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2089 event_sink: Option<&dyn YamlWorkflowEventSink>,
2090) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2091 run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
2092 workflow_path,
2093 workflow_input,
2094 client,
2095 custom_worker,
2096 event_sink,
2097 &YamlWorkflowRunOptions::default(),
2098 )
2099 .await
2100}
2101
2102pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
2103 workflow_path: &Path,
2104 workflow_input: &Value,
2105 client: &SimpleAgentsClient,
2106 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2107 event_sink: Option<&dyn YamlWorkflowEventSink>,
2108 options: &YamlWorkflowRunOptions,
2109) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2110 let contents =
2111 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
2112 path: workflow_path.display().to_string(),
2113 source,
2114 })?;
2115
2116 let workflow: YamlWorkflow =
2117 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
2118 path: workflow_path.display().to_string(),
2119 source,
2120 })?;
2121
2122 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2123 &workflow,
2124 workflow_input,
2125 client,
2126 custom_worker,
2127 event_sink,
2128 options,
2129 )
2130 .await
2131}
2132
2133pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker_and_events(
2134 workflow_path: &Path,
2135 email_text: &str,
2136 client: &SimpleAgentsClient,
2137 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2138 event_sink: Option<&dyn YamlWorkflowEventSink>,
2139) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2140 let workflow_input = json!({ "email_text": email_text });
2141 run_workflow_yaml_file_with_client_and_custom_worker_and_events(
2142 workflow_path,
2143 &workflow_input,
2144 client,
2145 custom_worker,
2146 event_sink,
2147 )
2148 .await
2149}
2150
2151pub async fn run_workflow_yaml_with_client_and_custom_worker(
2152 workflow: &YamlWorkflow,
2153 workflow_input: &Value,
2154 client: &SimpleAgentsClient,
2155 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2156) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2157 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2158 workflow,
2159 workflow_input,
2160 client,
2161 custom_worker,
2162 None,
2163 &YamlWorkflowRunOptions::default(),
2164 )
2165 .await
2166}
2167
2168pub async fn run_email_workflow_yaml_with_client_and_custom_worker(
2169 workflow: &YamlWorkflow,
2170 email_text: &str,
2171 client: &SimpleAgentsClient,
2172 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2173) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2174 let workflow_input = json!({ "email_text": email_text });
2175 run_workflow_yaml_with_client_and_custom_worker(
2176 workflow,
2177 &workflow_input,
2178 client,
2179 custom_worker,
2180 )
2181 .await
2182}
2183
2184pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events(
2185 workflow: &YamlWorkflow,
2186 workflow_input: &Value,
2187 client: &SimpleAgentsClient,
2188 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2189 event_sink: Option<&dyn YamlWorkflowEventSink>,
2190) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2191 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2192 workflow,
2193 workflow_input,
2194 client,
2195 custom_worker,
2196 event_sink,
2197 &YamlWorkflowRunOptions::default(),
2198 )
2199 .await
2200}
2201
2202pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2203 workflow: &YamlWorkflow,
2204 workflow_input: &Value,
2205 client: &SimpleAgentsClient,
2206 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2207 event_sink: Option<&dyn YamlWorkflowEventSink>,
2208 options: &YamlWorkflowRunOptions,
2209) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2210 struct BorrowedClientExecutor<'a> {
2211 client: &'a SimpleAgentsClient,
2212 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
2213 run_options: YamlWorkflowRunOptions,
2214 }
2215
2216 #[async_trait]
2217 impl<'a> YamlWorkflowLlmExecutor for BorrowedClientExecutor<'a> {
2218 async fn complete_structured(
2219 &self,
2220 request: YamlLlmExecutionRequest,
2221 event_sink: Option<&dyn YamlWorkflowEventSink>,
2222 ) -> Result<YamlLlmExecutionResult, String> {
2223 let expects_object = schema_expects_object(&request.schema);
2224 let messages = if let Some(mut history) = request.messages.clone() {
2225 if request.append_prompt_as_user && !request.prompt.trim().is_empty() {
2226 history.push(Message::user(&request.prompt));
2227 }
2228 history
2229 } else {
2230 vec![
2231 Message::system("You execute workflow classification steps."),
2232 Message::user(&request.prompt),
2233 ]
2234 };
2235
2236 if !request.tools.is_empty() {
2237 let mut tool_traces: Vec<YamlToolCallTrace> = Vec::new();
2238 let mut conversation = messages;
2239 let mut usage_total: Option<YamlLlmTokenUsage> = None;
2240
2241 for roundtrip in 0..=request.max_tool_roundtrips {
2242 let mut builder = CompletionRequest::builder()
2243 .model(&request.model)
2244 .messages(conversation.clone())
2245 .tools(request.tools.iter().map(|t| t.definition.clone()).collect());
2246
2247 if request.heal && expects_object {
2248 builder = builder.json_schema("workflow_step", request.schema.clone());
2249 }
2250
2251 if let Some(choice) = request.tool_choice.clone() {
2252 builder = builder.tool_choice(choice);
2253 }
2254
2255 if request.stream {
2256 builder = builder.stream(true);
2257 }
2258
2259 let completion_request = builder
2260 .build()
2261 .map_err(|error| format!("failed to build completion request: {error}"))?;
2262
2263 let outcome = self
2264 .client
2265 .complete(&completion_request, CompletionOptions::default())
2266 .await
2267 .map_err(|error| error.to_string())?;
2268
2269 let mut streamed_tool_calls: Option<Vec<ToolCall>> = None;
2270 let mut streamed_content = String::new();
2271 let mut finish_reason = FinishReason::Stop;
2272
2273 match outcome {
2274 CompletionOutcome::Response(response) => {
2275 if let Some(usage) = usage_total.as_mut() {
2276 usage.prompt_tokens += response.usage.prompt_tokens;
2277 usage.completion_tokens += response.usage.completion_tokens;
2278 usage.total_tokens += response.usage.total_tokens;
2279 if let Some(reasoning_tokens) = response.usage.reasoning_tokens {
2280 usage.reasoning_tokens = Some(
2281 usage.reasoning_tokens.unwrap_or(0) + reasoning_tokens,
2282 );
2283 }
2284 } else {
2285 usage_total = Some(YamlLlmTokenUsage {
2286 prompt_tokens: response.usage.prompt_tokens,
2287 completion_tokens: response.usage.completion_tokens,
2288 total_tokens: response.usage.total_tokens,
2289 reasoning_tokens: response.usage.reasoning_tokens,
2290 });
2291 }
2292
2293 let choice = response
2294 .choices
2295 .first()
2296 .ok_or_else(|| "completion returned no choices".to_string())?;
2297 streamed_content = choice.message.content.clone();
2298 streamed_tool_calls = choice.message.tool_calls.clone();
2299 finish_reason = choice.finish_reason;
2300 }
2301 CompletionOutcome::HealedJson(healed) => {
2302 let response = healed.response;
2303 if let Some(usage) = usage_total.as_mut() {
2304 usage.prompt_tokens += response.usage.prompt_tokens;
2305 usage.completion_tokens += response.usage.completion_tokens;
2306 usage.total_tokens += response.usage.total_tokens;
2307 if let Some(reasoning_tokens) = response.usage.reasoning_tokens {
2308 usage.reasoning_tokens = Some(
2309 usage.reasoning_tokens.unwrap_or(0) + reasoning_tokens,
2310 );
2311 }
2312 } else {
2313 usage_total = Some(YamlLlmTokenUsage {
2314 prompt_tokens: response.usage.prompt_tokens,
2315 completion_tokens: response.usage.completion_tokens,
2316 total_tokens: response.usage.total_tokens,
2317 reasoning_tokens: response.usage.reasoning_tokens,
2318 });
2319 }
2320
2321 let choice = response
2322 .choices
2323 .first()
2324 .ok_or_else(|| "completion returned no choices".to_string())?;
2325 streamed_content = choice.message.content.clone();
2326 streamed_tool_calls = choice.message.tool_calls.clone();
2327 finish_reason = choice.finish_reason;
2328 }
2329 CompletionOutcome::CoercedSchema(coerced) => {
2330 let response = coerced.response;
2331 if let Some(usage) = usage_total.as_mut() {
2332 usage.prompt_tokens += response.usage.prompt_tokens;
2333 usage.completion_tokens += response.usage.completion_tokens;
2334 usage.total_tokens += response.usage.total_tokens;
2335 if let Some(reasoning_tokens) = response.usage.reasoning_tokens {
2336 usage.reasoning_tokens = Some(
2337 usage.reasoning_tokens.unwrap_or(0) + reasoning_tokens,
2338 );
2339 }
2340 } else {
2341 usage_total = Some(YamlLlmTokenUsage {
2342 prompt_tokens: response.usage.prompt_tokens,
2343 completion_tokens: response.usage.completion_tokens,
2344 total_tokens: response.usage.total_tokens,
2345 reasoning_tokens: response.usage.reasoning_tokens,
2346 });
2347 }
2348
2349 let choice = response
2350 .choices
2351 .first()
2352 .ok_or_else(|| "completion returned no choices".to_string())?;
2353 streamed_content = choice.message.content.clone();
2354 streamed_tool_calls = choice.message.tool_calls.clone();
2355 finish_reason = choice.finish_reason;
2356 }
2357 CompletionOutcome::Stream(mut stream) => {
2358 let mut final_stream_usage: Option<simple_agent_type::response::Usage> =
2359 None;
2360 let mut delta_filter = StructuredJsonDeltaFilter::default();
2361 let include_raw_debug = include_raw_stream_debug_events();
2362 let mut json_text_formatter = if request.stream_json_as_text {
2363 Some(StreamJsonAsTextFormatter::default())
2364 } else {
2365 None
2366 };
2367 let mut tool_calls_by_index: HashMap<u32, ToolCall> = HashMap::new();
2368
2369 while let Some(chunk_result) = stream.next().await {
2370 if event_sink_is_cancelled(event_sink) {
2371 return Err(workflow_event_sink_cancelled_message().to_string());
2372 }
2373
2374 let chunk = chunk_result.map_err(|error| error.to_string())?;
2375 if let Some(usage) = chunk.usage {
2376 final_stream_usage = Some(usage);
2377 }
2378
2379 if let Some(choice) = chunk.choices.first() {
2380 if let Some(chunk_finish_reason) = choice.finish_reason {
2381 finish_reason = chunk_finish_reason;
2382 }
2383
2384 if include_raw_debug {
2385 if let Some(reasoning_delta) =
2386 choice.delta.reasoning_content.as_ref()
2387 {
2388 if let Some(sink) = event_sink {
2389 sink.emit(&YamlWorkflowEvent {
2390 event_type: "node_stream_thinking_delta"
2391 .to_string(),
2392 node_id: Some(request.node_id.clone()),
2393 step_id: Some(request.node_id.clone()),
2394 node_kind: Some("llm_call".to_string()),
2395 streamable: Some(true),
2396 message: None,
2397 delta: Some(reasoning_delta.clone()),
2398 token_kind: Some(
2399 YamlWorkflowTokenKind::Thinking,
2400 ),
2401 is_terminal_node_token: Some(
2402 request.is_terminal_node,
2403 ),
2404 elapsed_ms: None,
2405 metadata: None,
2406 });
2407 }
2408 }
2409 }
2410
2411 if let Some(delta) = choice.delta.content.clone() {
2412 streamed_content.push_str(delta.as_str());
2413 let (output_delta, thinking_delta) = if expects_object {
2414 delta_filter.split(delta.as_str())
2415 } else {
2416 (Some(delta.clone()), None)
2417 };
2418 let rendered_output_delta = if let Some(output_chunk) =
2419 output_delta
2420 {
2421 if let Some(formatter) = json_text_formatter.as_mut() {
2422 formatter.push(output_chunk.as_str());
2423 formatter.emit_if_ready(delta_filter.completed())
2424 } else {
2425 Some(output_chunk)
2426 }
2427 } else {
2428 None
2429 };
2430
2431 if include_raw_debug {
2432 if let Some(sink) = event_sink {
2433 if let Some(raw_thinking_delta) =
2434 thinking_delta.as_ref()
2435 {
2436 sink.emit(&YamlWorkflowEvent {
2437 event_type: "node_stream_thinking_delta"
2438 .to_string(),
2439 node_id: Some(request.node_id.clone()),
2440 step_id: Some(request.node_id.clone()),
2441 node_kind: Some("llm_call".to_string()),
2442 streamable: Some(true),
2443 message: None,
2444 delta: Some(raw_thinking_delta.clone()),
2445 token_kind: Some(
2446 YamlWorkflowTokenKind::Thinking,
2447 ),
2448 is_terminal_node_token: Some(
2449 request.is_terminal_node,
2450 ),
2451 elapsed_ms: None,
2452 metadata: None,
2453 });
2454 }
2455 if let Some(raw_output_delta) =
2456 rendered_output_delta.as_ref()
2457 {
2458 sink.emit(&YamlWorkflowEvent {
2459 event_type: "node_stream_output_delta"
2460 .to_string(),
2461 node_id: Some(request.node_id.clone()),
2462 step_id: Some(request.node_id.clone()),
2463 node_kind: Some("llm_call".to_string()),
2464 streamable: Some(true),
2465 message: None,
2466 delta: Some(raw_output_delta.clone()),
2467 token_kind: Some(
2468 YamlWorkflowTokenKind::Output,
2469 ),
2470 is_terminal_node_token: Some(
2471 request.is_terminal_node,
2472 ),
2473 elapsed_ms: None,
2474 metadata: None,
2475 });
2476 }
2477 }
2478 }
2479
2480 if let Some(filtered_delta) = rendered_output_delta {
2481 if let Some(sink) = event_sink {
2482 sink.emit(&YamlWorkflowEvent {
2483 event_type: "node_stream_delta".to_string(),
2484 node_id: Some(request.node_id.clone()),
2485 step_id: Some(request.node_id.clone()),
2486 node_kind: Some("llm_call".to_string()),
2487 streamable: Some(true),
2488 message: None,
2489 delta: Some(filtered_delta),
2490 token_kind: Some(YamlWorkflowTokenKind::Output),
2491 is_terminal_node_token: Some(
2492 request.is_terminal_node,
2493 ),
2494 elapsed_ms: None,
2495 metadata: None,
2496 });
2497 }
2498 }
2499 }
2500
2501 if let Some(tool_call_deltas) = choice.delta.tool_calls.as_ref()
2502 {
2503 for tool_call_delta in tool_call_deltas {
2504 let entry = tool_calls_by_index
2505 .entry(tool_call_delta.index)
2506 .or_insert_with(|| ToolCall {
2507 id: tool_call_delta.id.clone().unwrap_or_else(
2508 || {
2509 format!(
2510 "tool_call_{}",
2511 tool_call_delta.index
2512 )
2513 },
2514 ),
2515 tool_type: ToolType::Function,
2516 function:
2517 simple_agent_type::tool::ToolCallFunction {
2518 name: String::new(),
2519 arguments: String::new(),
2520 },
2521 });
2522
2523 if let Some(id) = tool_call_delta.id.as_ref() {
2524 entry.id = id.clone();
2525 }
2526 if let Some(tool_type) = tool_call_delta.tool_type {
2527 entry.tool_type = tool_type;
2528 }
2529 if let Some(function_delta) =
2530 tool_call_delta.function.as_ref()
2531 {
2532 if let Some(name) = function_delta.name.as_ref() {
2533 entry.function.name = name.clone();
2534 }
2535 if let Some(arguments) =
2536 function_delta.arguments.as_ref()
2537 {
2538 entry.function.arguments.push_str(arguments);
2539 }
2540 }
2541 }
2542 }
2543 }
2544
2545 if event_sink_is_cancelled(event_sink) {
2546 return Err(workflow_event_sink_cancelled_message().to_string());
2547 }
2548 }
2549
2550 if let Some(usage) = final_stream_usage {
2551 if let Some(total) = usage_total.as_mut() {
2552 total.prompt_tokens += usage.prompt_tokens;
2553 total.completion_tokens += usage.completion_tokens;
2554 total.total_tokens += usage.total_tokens;
2555 if let Some(reasoning_tokens) = usage.reasoning_tokens {
2556 total.reasoning_tokens = Some(
2557 total.reasoning_tokens.unwrap_or(0) + reasoning_tokens,
2558 );
2559 }
2560 } else {
2561 usage_total = Some(YamlLlmTokenUsage {
2562 prompt_tokens: usage.prompt_tokens,
2563 completion_tokens: usage.completion_tokens,
2564 total_tokens: usage.total_tokens,
2565 reasoning_tokens: usage.reasoning_tokens,
2566 });
2567 }
2568 }
2569
2570 let mut ordered_tool_calls =
2571 tool_calls_by_index.into_iter().collect::<Vec<_>>();
2572 ordered_tool_calls.sort_by_key(|(index, _)| *index);
2573 if !ordered_tool_calls.is_empty() {
2574 streamed_tool_calls = Some(
2575 ordered_tool_calls
2576 .into_iter()
2577 .map(|(_, tool_call)| tool_call)
2578 .collect::<Vec<_>>(),
2579 );
2580 }
2581 }
2582 }
2583
2584 let has_tool_calls = streamed_tool_calls
2585 .as_ref()
2586 .is_some_and(|calls| !calls.is_empty());
2587 if finish_reason != FinishReason::ToolCalls && !has_tool_calls {
2588 let payload = if expects_object {
2589 parse_streamed_structured_payload(
2590 streamed_content.as_str(),
2591 request.heal,
2592 )
2593 .map_err(|error| {
2594 format!("failed to parse structured completion JSON: {error}")
2595 })?
2596 .payload
2597 } else {
2598 Value::String(streamed_content.clone())
2599 };
2600 return Ok(YamlLlmExecutionResult {
2601 payload,
2602 usage: usage_total,
2603 ttft_ms: None,
2604 tool_calls: tool_traces,
2605 });
2606 }
2607
2608 if roundtrip >= request.max_tool_roundtrips {
2609 return Err(format!(
2610 "tool call roundtrip limit reached for node '{}' (max={})",
2611 request.node_id, request.max_tool_roundtrips
2612 ));
2613 }
2614
2615 let tool_calls: Vec<ToolCall> = streamed_tool_calls.ok_or_else(|| {
2616 "finish_reason=tool_calls but no tool calls found".to_string()
2617 })?;
2618 if tool_calls
2619 .iter()
2620 .any(|tool_call| tool_call.function.name.trim().is_empty())
2621 {
2622 return Err("streamed tool call missing function name".to_string());
2623 }
2624
2625 let assistant_tool_message =
2626 Message::assistant(&streamed_content).with_tool_calls(tool_calls.clone());
2627 conversation.push(assistant_tool_message);
2628
2629 for tool_call in tool_calls {
2630 let tool_call_id = tool_call.id.clone();
2631 let tool_name = tool_call.function.name.clone();
2632 let tool_started = Instant::now();
2633 let arguments: Value = serde_json::from_str(&tool_call.function.arguments)
2634 .map_err(|error| {
2635 format!(
2636 "tool '{}' arguments must be valid JSON: {}",
2637 tool_name, error
2638 )
2639 })?;
2640 let mut tool_span_context: Option<TraceContext> = None;
2641 let mut tool_span = if request.trace_sampled {
2642 let (span_context, mut span) = workflow_tracer().start_span(
2643 "workflow.tool.execute",
2644 SpanKind::Node,
2645 request.trace_context.as_ref(),
2646 );
2647 tool_span_context = Some(span_context);
2648 apply_trace_identity_attributes(
2649 span.as_mut(),
2650 request.trace_id.as_deref(),
2651 );
2652 apply_trace_tenant_attributes_from_tenant(
2653 span.as_mut(),
2654 &request.tenant_context,
2655 );
2656 span.set_attribute("node_id", request.node_id.as_str());
2657 span.set_attribute("node_kind", "llm_call");
2658 span.set_attribute("tool_name", tool_name.as_str());
2659 span.set_attribute("tool_call_id", tool_call_id.as_str());
2660 let args_for_span =
2661 payload_for_tool_trace(request.tool_trace_mode, &arguments)
2662 .to_string();
2663 span.set_attribute("tool_arguments", args_for_span.as_str());
2664 Some(span)
2665 } else {
2666 None
2667 };
2668
2669 if request.tool_trace_mode != YamlToolTraceMode::Off {
2670 if let Some(sink) = event_sink {
2671 sink.emit(&YamlWorkflowEvent {
2672 event_type: "node_tool_call_requested".to_string(),
2673 node_id: Some(request.node_id.clone()),
2674 step_id: Some(request.node_id.clone()),
2675 node_kind: Some("llm_call".to_string()),
2676 streamable: Some(false),
2677 message: Some(format!(
2678 "tool call requested: {}",
2679 tool_name
2680 )),
2681 delta: None,
2682 token_kind: None,
2683 is_terminal_node_token: None,
2684 elapsed_ms: None,
2685 metadata: Some(json!({
2686 "tool_call_id": tool_call_id.clone(),
2687 "tool_name": tool_name.clone(),
2688 "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
2689 })),
2690 });
2691 }
2692 }
2693
2694 let Some(tool_config) = request
2695 .tools
2696 .iter()
2697 .find(|tool| tool.definition.function.name == tool_name)
2698 else {
2699 return Err(format!("model requested unknown tool '{}'", tool_name));
2700 };
2701
2702 let tool_output_result = if tool_name == "run_workflow_graph" {
2703 execute_subworkflow_tool_call(
2704 &arguments,
2705 &request.execution_context,
2706 self.client,
2707 self.custom_worker,
2708 &self.run_options,
2709 tool_span_context.as_ref(),
2710 request.trace_id.as_deref(),
2711 )
2712 .await
2713 } else if let Some(custom_worker) = self.custom_worker {
2714 custom_worker
2715 .execute(
2716 tool_name.as_str(),
2717 &arguments,
2718 request.email_text.as_str(),
2719 &request.execution_context,
2720 )
2721 .await
2722 } else {
2723 Err(format!(
2724 "tool '{}' requested but no custom worker executor is configured",
2725 tool_name
2726 ))
2727 };
2728
2729 let tool_output = match tool_output_result {
2730 Ok(output) => output,
2731 Err(message) => {
2732 let elapsed_ms = tool_started.elapsed().as_millis();
2733 if let Some(span) = tool_span.as_mut() {
2734 span.add_event("workflow.tool.execute.error");
2735 span.set_attribute("tool_status", "error");
2736 span.set_attribute("tool_error", message.as_str());
2737 span.set_attribute(
2738 "elapsed_ms",
2739 elapsed_ms.to_string().as_str(),
2740 );
2741 }
2742 if request.tool_trace_mode != YamlToolTraceMode::Off {
2743 if let Some(sink) = event_sink {
2744 sink.emit(&YamlWorkflowEvent {
2745 event_type: "node_tool_call_failed".to_string(),
2746 node_id: Some(request.node_id.clone()),
2747 step_id: Some(request.node_id.clone()),
2748 node_kind: Some("llm_call".to_string()),
2749 streamable: Some(false),
2750 message: Some(message.clone()),
2751 delta: None,
2752 token_kind: None,
2753 is_terminal_node_token: None,
2754 elapsed_ms: Some(elapsed_ms),
2755 metadata: Some(json!({
2756 "tool_call_id": tool_call_id.clone(),
2757 "tool_name": tool_name.clone(),
2758 })),
2759 });
2760 }
2761 }
2762 tool_traces.push(YamlToolCallTrace {
2763 id: tool_call_id.clone(),
2764 name: tool_name.clone(),
2765 arguments,
2766 output: None,
2767 status: "error".to_string(),
2768 elapsed_ms,
2769 error: Some(message.clone()),
2770 });
2771 if let Some(span) = tool_span.take() {
2772 span.end();
2773 }
2774 return Err(format!("tool '{}' failed: {}", tool_name, message));
2775 }
2776 };
2777
2778 if let Some(output_schema) = tool_config.output_schema.as_ref() {
2779 validate_schema_instance(output_schema, &tool_output).map_err(
2780 |message| {
2781 format!(
2782 "tool '{}' output failed schema validation: {}",
2783 tool_name, message
2784 )
2785 },
2786 )?;
2787 }
2788
2789 let elapsed_ms = tool_started.elapsed().as_millis();
2790 if let Some(span) = tool_span.as_mut() {
2791 span.add_event("workflow.tool.execute.completed");
2792 span.set_attribute("tool_status", "ok");
2793 span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
2794 let output_for_span =
2795 payload_for_tool_trace(request.tool_trace_mode, &tool_output)
2796 .to_string();
2797 span.set_attribute("tool_output", output_for_span.as_str());
2798 }
2799 if request.tool_trace_mode != YamlToolTraceMode::Off {
2800 if let Some(sink) = event_sink {
2801 sink.emit(&YamlWorkflowEvent {
2802 event_type: "node_tool_call_completed".to_string(),
2803 node_id: Some(request.node_id.clone()),
2804 step_id: Some(request.node_id.clone()),
2805 node_kind: Some("llm_call".to_string()),
2806 streamable: Some(false),
2807 message: Some(format!(
2808 "tool call completed: {}",
2809 tool_name
2810 )),
2811 delta: None,
2812 token_kind: None,
2813 is_terminal_node_token: None,
2814 elapsed_ms: Some(elapsed_ms),
2815 metadata: Some(json!({
2816 "tool_call_id": tool_call_id.clone(),
2817 "tool_name": tool_name.clone(),
2818 "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
2819 "output": payload_for_tool_trace(request.tool_trace_mode, &tool_output),
2820 })),
2821 });
2822 }
2823 }
2824
2825 tool_traces.push(YamlToolCallTrace {
2826 id: tool_call_id.clone(),
2827 name: tool_name.clone(),
2828 arguments: arguments.clone(),
2829 output: Some(tool_output.clone()),
2830 status: "ok".to_string(),
2831 elapsed_ms,
2832 error: None,
2833 });
2834
2835 conversation.push(Message::tool(
2836 serde_json::to_string(&tool_output).map_err(|error| {
2837 format!("failed to serialize tool output: {error}")
2838 })?,
2839 tool_call_id,
2840 ));
2841 if let Some(span) = tool_span.take() {
2842 span.end();
2843 }
2844 }
2845
2846 if request.tool_trace_mode != YamlToolTraceMode::Off {
2847 if let Some(sink) = event_sink {
2848 sink.emit(&YamlWorkflowEvent {
2849 event_type: "node_tool_roundtrip_completed".to_string(),
2850 node_id: Some(request.node_id.clone()),
2851 step_id: Some(request.node_id.clone()),
2852 node_kind: Some("llm_call".to_string()),
2853 streamable: Some(false),
2854 message: Some(format!(
2855 "tool roundtrip {} completed",
2856 roundtrip + 1
2857 )),
2858 delta: None,
2859 token_kind: None,
2860 is_terminal_node_token: None,
2861 elapsed_ms: None,
2862 metadata: Some(json!({
2863 "roundtrip": roundtrip + 1,
2864 "max_tool_roundtrips": request.max_tool_roundtrips,
2865 })),
2866 });
2867 }
2868 }
2869 }
2870
2871 return Err(format!(
2872 "tool-enabled llm_call '{}' exhausted loop without final payload",
2873 request.node_id
2874 ));
2875 }
2876
2877 let mut builder = CompletionRequest::builder()
2878 .model(&request.model)
2879 .messages(messages);
2880
2881 if request.heal && !request.stream && expects_object {
2882 builder = builder.json_schema("workflow_step", request.schema.clone());
2883 }
2884
2885 if request.stream {
2886 builder = builder.stream(true);
2887 }
2888
2889 let completion_request = builder
2890 .build()
2891 .map_err(|error| format!("failed to build completion request: {error}"))?;
2892
2893 let completion_options = if request.heal && !request.stream && expects_object {
2894 CompletionOptions {
2895 mode: CompletionMode::HealedJson,
2896 }
2897 } else {
2898 CompletionOptions::default()
2899 };
2900
2901 let outcome = self
2902 .client
2903 .complete(&completion_request, completion_options)
2904 .await
2905 .map_err(|error| error.to_string())?;
2906
2907 match outcome {
2908 CompletionOutcome::Stream(mut stream) => {
2909 let mut aggregated = String::new();
2910 let mut final_stream_usage: Option<simple_agent_type::response::Usage> = None;
2911 let stream_started = Instant::now();
2912 let mut ttft_ms: Option<u128> = None;
2913 let mut delta_filter = StructuredJsonDeltaFilter::default();
2914 let include_raw_debug = include_raw_stream_debug_events();
2915 let mut json_text_formatter = if request.stream_json_as_text {
2916 Some(StreamJsonAsTextFormatter::default())
2917 } else {
2918 None
2919 };
2920 while let Some(chunk_result) = stream.next().await {
2921 if event_sink_is_cancelled(event_sink) {
2922 return Err(workflow_event_sink_cancelled_message().to_string());
2923 }
2924 let chunk = chunk_result.map_err(|error| error.to_string())?;
2925 if let Some(usage) = chunk.usage {
2926 final_stream_usage = Some(usage);
2927 }
2928 if let Some(choice) = chunk.choices.first() {
2929 if ttft_ms.is_none()
2930 && (choice
2931 .delta
2932 .content
2933 .as_ref()
2934 .is_some_and(|delta| !delta.is_empty())
2935 || choice
2936 .delta
2937 .reasoning_content
2938 .as_ref()
2939 .is_some_and(|delta| !delta.is_empty()))
2940 {
2941 ttft_ms = Some(stream_started.elapsed().as_millis());
2942 }
2943 if include_raw_debug {
2944 if let Some(reasoning_delta) =
2945 choice.delta.reasoning_content.as_ref()
2946 {
2947 if let Some(sink) = event_sink {
2948 sink.emit(&YamlWorkflowEvent {
2949 event_type: "node_stream_thinking_delta".to_string(),
2950 node_id: Some(request.node_id.clone()),
2951 step_id: Some(request.node_id.clone()),
2952 node_kind: Some("llm_call".to_string()),
2953 streamable: Some(true),
2954 message: None,
2955 delta: Some(reasoning_delta.clone()),
2956 token_kind: Some(YamlWorkflowTokenKind::Thinking),
2957 is_terminal_node_token: Some(request.is_terminal_node),
2958 elapsed_ms: None,
2959 metadata: None,
2960 });
2961 }
2962 }
2963 }
2964 if let Some(delta) = choice.delta.content.clone() {
2965 aggregated.push_str(delta.as_str());
2966 let (output_delta, thinking_delta) = if expects_object {
2967 delta_filter.split(delta.as_str())
2968 } else {
2969 (Some(delta.clone()), None)
2970 };
2971 let rendered_output_delta = if let Some(output_chunk) = output_delta
2972 {
2973 if let Some(formatter) = json_text_formatter.as_mut() {
2974 formatter.push(output_chunk.as_str());
2975 formatter.emit_if_ready(delta_filter.completed())
2976 } else {
2977 Some(output_chunk)
2978 }
2979 } else {
2980 None
2981 };
2982 if include_raw_debug {
2983 if let Some(sink) = event_sink {
2984 if let Some(raw_thinking_delta) = thinking_delta.as_ref() {
2985 sink.emit(&YamlWorkflowEvent {
2986 event_type: "node_stream_thinking_delta"
2987 .to_string(),
2988 node_id: Some(request.node_id.clone()),
2989 step_id: Some(request.node_id.clone()),
2990 node_kind: Some("llm_call".to_string()),
2991 streamable: Some(true),
2992 message: None,
2993 delta: Some(raw_thinking_delta.clone()),
2994 token_kind: Some(YamlWorkflowTokenKind::Thinking),
2995 is_terminal_node_token: Some(
2996 request.is_terminal_node,
2997 ),
2998 elapsed_ms: None,
2999 metadata: None,
3000 });
3001 }
3002 if let Some(raw_output_delta) =
3003 rendered_output_delta.as_ref()
3004 {
3005 sink.emit(&YamlWorkflowEvent {
3006 event_type: "node_stream_output_delta".to_string(),
3007 node_id: Some(request.node_id.clone()),
3008 step_id: Some(request.node_id.clone()),
3009 node_kind: Some("llm_call".to_string()),
3010 streamable: Some(true),
3011 message: None,
3012 delta: Some(raw_output_delta.clone()),
3013 token_kind: Some(YamlWorkflowTokenKind::Output),
3014 is_terminal_node_token: Some(
3015 request.is_terminal_node,
3016 ),
3017 elapsed_ms: None,
3018 metadata: None,
3019 });
3020 }
3021 }
3022 }
3023 if let Some(filtered_delta) = rendered_output_delta {
3024 if let Some(sink) = event_sink {
3025 sink.emit(&YamlWorkflowEvent {
3026 event_type: "node_stream_delta".to_string(),
3027 node_id: Some(request.node_id.clone()),
3028 step_id: Some(request.node_id.clone()),
3029 node_kind: Some("llm_call".to_string()),
3030 streamable: Some(true),
3031 message: None,
3032 delta: Some(filtered_delta),
3033 token_kind: Some(YamlWorkflowTokenKind::Output),
3034 is_terminal_node_token: Some(request.is_terminal_node),
3035 elapsed_ms: None,
3036 metadata: None,
3037 });
3038 }
3039 }
3040 }
3041 }
3042
3043 if event_sink_is_cancelled(event_sink) {
3044 return Err(workflow_event_sink_cancelled_message().to_string());
3045 }
3046 }
3047
3048 let payload = if expects_object {
3049 let resolved =
3050 parse_streamed_structured_payload(aggregated.as_str(), request.heal)?;
3051 if let Some(confidence) = resolved.heal_confidence {
3052 if let Some(sink) = event_sink {
3053 sink.emit(&YamlWorkflowEvent {
3054 event_type: "node_healed".to_string(),
3055 node_id: Some(request.node_id.clone()),
3056 step_id: Some(request.node_id.clone()),
3057 node_kind: Some("llm_call".to_string()),
3058 streamable: Some(true),
3059 message: Some(format!(
3060 "healed streamed structured response confidence={confidence}"
3061 )),
3062 delta: None,
3063 token_kind: None,
3064 is_terminal_node_token: None,
3065 elapsed_ms: None,
3066 metadata: None,
3067 });
3068 }
3069 }
3070 resolved.payload
3071 } else {
3072 Value::String(aggregated)
3073 };
3074
3075 Ok(YamlLlmExecutionResult {
3076 payload,
3077 usage: final_stream_usage.map(|usage| YamlLlmTokenUsage {
3078 prompt_tokens: usage.prompt_tokens,
3079 completion_tokens: usage.completion_tokens,
3080 total_tokens: usage.total_tokens,
3081 reasoning_tokens: usage.reasoning_tokens,
3082 }),
3083 ttft_ms,
3084 tool_calls: Vec::new(),
3085 })
3086 }
3087 CompletionOutcome::Response(response) => {
3088 let payload = if expects_object {
3089 let content = response
3090 .content()
3091 .ok_or_else(|| "completion returned empty content".to_string())?;
3092 serde_json::from_str(content).map_err(|error| {
3093 format!("failed to parse structured completion JSON: {error}")
3094 })?
3095 } else {
3096 Value::String(response.content().unwrap_or_default().to_string())
3097 };
3098
3099 Ok(YamlLlmExecutionResult {
3100 payload,
3101 usage: Some(YamlLlmTokenUsage {
3102 prompt_tokens: response.usage.prompt_tokens,
3103 completion_tokens: response.usage.completion_tokens,
3104 total_tokens: response.usage.total_tokens,
3105 reasoning_tokens: response.usage.reasoning_tokens,
3106 }),
3107 ttft_ms: None,
3108 tool_calls: Vec::new(),
3109 })
3110 }
3111 CompletionOutcome::HealedJson(healed) => {
3112 if !expects_object {
3113 return Err(
3114 "healed json outcome is unsupported for non-object schema".to_string()
3115 );
3116 }
3117 if let Some(sink) = event_sink {
3118 sink.emit(&YamlWorkflowEvent {
3119 event_type: "node_healed".to_string(),
3120 node_id: Some(request.node_id.clone()),
3121 step_id: Some(request.node_id.clone()),
3122 node_kind: Some("llm_call".to_string()),
3123 streamable: Some(request.stream),
3124 message: Some(format!(
3125 "healed structured response confidence={}",
3126 healed.parsed.confidence
3127 )),
3128 delta: None,
3129 token_kind: None,
3130 is_terminal_node_token: None,
3131 elapsed_ms: None,
3132 metadata: None,
3133 });
3134 }
3135 Ok(YamlLlmExecutionResult {
3136 payload: healed.parsed.value,
3137 usage: Some(YamlLlmTokenUsage {
3138 prompt_tokens: healed.response.usage.prompt_tokens,
3139 completion_tokens: healed.response.usage.completion_tokens,
3140 total_tokens: healed.response.usage.total_tokens,
3141 reasoning_tokens: healed.response.usage.reasoning_tokens,
3142 }),
3143 ttft_ms: None,
3144 tool_calls: Vec::new(),
3145 })
3146 }
3147 CompletionOutcome::CoercedSchema(coerced) => {
3148 if !expects_object {
3149 return Err(
3150 "coerced schema outcome is unsupported for non-object schema"
3151 .to_string(),
3152 );
3153 }
3154 Ok(YamlLlmExecutionResult {
3155 payload: coerced.coerced.value,
3156 usage: Some(YamlLlmTokenUsage {
3157 prompt_tokens: coerced.response.usage.prompt_tokens,
3158 completion_tokens: coerced.response.usage.completion_tokens,
3159 total_tokens: coerced.response.usage.total_tokens,
3160 reasoning_tokens: coerced.response.usage.reasoning_tokens,
3161 }),
3162 ttft_ms: None,
3163 tool_calls: Vec::new(),
3164 })
3165 }
3166 }
3167 }
3168 }
3169
3170 let executor = BorrowedClientExecutor {
3171 client,
3172 custom_worker,
3173 run_options: options.clone(),
3174 };
3175 run_workflow_yaml_with_custom_worker_and_events_and_options(
3176 workflow,
3177 workflow_input,
3178 &executor,
3179 custom_worker,
3180 event_sink,
3181 options,
3182 )
3183 .await
3184}
3185
3186pub async fn run_email_workflow_yaml_with_client_and_custom_worker_and_events(
3187 workflow: &YamlWorkflow,
3188 email_text: &str,
3189 client: &SimpleAgentsClient,
3190 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3191 event_sink: Option<&dyn YamlWorkflowEventSink>,
3192) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3193 let workflow_input = json!({ "email_text": email_text });
3194 run_workflow_yaml_with_client_and_custom_worker_and_events(
3195 workflow,
3196 &workflow_input,
3197 client,
3198 custom_worker,
3199 event_sink,
3200 )
3201 .await
3202}
3203
3204pub async fn run_workflow_yaml(
3205 workflow: &YamlWorkflow,
3206 workflow_input: &Value,
3207 executor: &dyn YamlWorkflowLlmExecutor,
3208) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3209 run_workflow_yaml_with_custom_worker_and_events(workflow, workflow_input, executor, None, None)
3210 .await
3211}
3212
3213pub async fn run_email_workflow_yaml(
3214 workflow: &YamlWorkflow,
3215 email_text: &str,
3216 executor: &dyn YamlWorkflowLlmExecutor,
3217) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3218 let workflow_input = json!({ "email_text": email_text });
3219 run_workflow_yaml(workflow, &workflow_input, executor).await
3220}
3221
3222pub async fn run_workflow_yaml_with_custom_worker(
3223 workflow: &YamlWorkflow,
3224 workflow_input: &Value,
3225 executor: &dyn YamlWorkflowLlmExecutor,
3226 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3227) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3228 run_workflow_yaml_with_custom_worker_and_events(
3229 workflow,
3230 workflow_input,
3231 executor,
3232 custom_worker,
3233 None,
3234 )
3235 .await
3236}
3237
3238pub async fn run_email_workflow_yaml_with_custom_worker(
3239 workflow: &YamlWorkflow,
3240 email_text: &str,
3241 executor: &dyn YamlWorkflowLlmExecutor,
3242 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3243) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3244 let workflow_input = json!({ "email_text": email_text });
3245 run_workflow_yaml_with_custom_worker(workflow, &workflow_input, executor, custom_worker).await
3246}
3247
3248pub async fn run_workflow_yaml_with_custom_worker_and_events(
3249 workflow: &YamlWorkflow,
3250 workflow_input: &Value,
3251 executor: &dyn YamlWorkflowLlmExecutor,
3252 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3253 event_sink: Option<&dyn YamlWorkflowEventSink>,
3254) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3255 run_workflow_yaml_with_custom_worker_and_events_and_options(
3256 workflow,
3257 workflow_input,
3258 executor,
3259 custom_worker,
3260 event_sink,
3261 &YamlWorkflowRunOptions::default(),
3262 )
3263 .await
3264}
3265
3266pub async fn run_workflow_yaml_with_custom_worker_and_events_and_options(
3267 workflow: &YamlWorkflow,
3268 workflow_input: &Value,
3269 executor: &dyn YamlWorkflowLlmExecutor,
3270 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3271 event_sink: Option<&dyn YamlWorkflowEventSink>,
3272 options: &YamlWorkflowRunOptions,
3273) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3274 if !workflow_input.is_object() {
3275 return Err(YamlWorkflowRunError::InvalidInput {
3276 message: "workflow input must be a JSON object".to_string(),
3277 });
3278 }
3279
3280 validate_sample_rate(options.telemetry.sample_rate)?;
3281
3282 let email_text = workflow_input
3283 .get("email_text")
3284 .and_then(Value::as_str)
3285 .unwrap_or_default();
3286
3287 let diagnostics = verify_yaml_workflow(workflow);
3288 let errors: Vec<YamlWorkflowDiagnostic> = diagnostics
3289 .iter()
3290 .filter(|d| d.severity == YamlWorkflowDiagnosticSeverity::Error)
3291 .cloned()
3292 .collect();
3293 if !errors.is_empty() {
3294 return Err(YamlWorkflowRunError::Validation {
3295 diagnostics_count: errors.len(),
3296 diagnostics: errors,
3297 });
3298 }
3299
3300 if let Some(output) =
3301 try_run_yaml_via_ir_runtime(workflow, workflow_input, executor, custom_worker, options)
3302 .await?
3303 {
3304 return Ok(output);
3305 }
3306
3307 let parent_trace_context = trace_context_from_options(options);
3308 let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
3309
3310 let tracer = workflow_tracer();
3311 let mut workflow_span_context: Option<TraceContext> = None;
3312 let mut workflow_span = if telemetry_context.sampled {
3313 let (span_context, mut span) = tracer.start_span(
3314 "workflow.run",
3315 SpanKind::Workflow,
3316 parent_trace_context.as_ref(),
3317 );
3318 apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
3319 apply_trace_tenant_attributes(span.as_mut(), options);
3320 workflow_span_context = Some(span_context);
3321 Some(span)
3322 } else {
3323 None
3324 };
3325
3326 if workflow.nodes.is_empty() {
3327 return Err(YamlWorkflowRunError::EmptyNodes {
3328 workflow_id: workflow.id.clone(),
3329 });
3330 }
3331
3332 let node_map: HashMap<&str, &YamlNode> = workflow
3333 .nodes
3334 .iter()
3335 .map(|node| (node.id.as_str(), node))
3336 .collect();
3337 if !node_map.contains_key(workflow.entry_node.as_str()) {
3338 return Err(YamlWorkflowRunError::MissingEntry {
3339 entry_node: workflow.entry_node.clone(),
3340 });
3341 }
3342
3343 let edge_map: HashMap<&str, &str> = workflow
3344 .edges
3345 .iter()
3346 .map(|edge| (edge.from.as_str(), edge.to.as_str()))
3347 .collect();
3348
3349 let mut current = workflow.entry_node.clone();
3350 let mut trace = Vec::new();
3351 let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
3352 let mut globals = serde_json::Map::new();
3353 let mut step_timings = Vec::new();
3354 let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
3355 let mut llm_node_models: BTreeMap<String, String> = BTreeMap::new();
3356 let mut token_totals = YamlTokenTotals::default();
3357 let mut workflow_ttft_ms: Option<u128> = None;
3358 let started = Instant::now();
3359
3360 if let Some(sink) = event_sink {
3361 sink.emit(&YamlWorkflowEvent {
3362 event_type: "workflow_started".to_string(),
3363 node_id: None,
3364 step_id: None,
3365 node_kind: None,
3366 streamable: None,
3367 message: Some(format!("workflow_id={}", workflow.id)),
3368 delta: None,
3369 token_kind: None,
3370 is_terminal_node_token: None,
3371 elapsed_ms: Some(0),
3372 metadata: None,
3373 });
3374 }
3375
3376 if event_sink_is_cancelled(event_sink) {
3377 return Err(YamlWorkflowRunError::EventSinkCancelled {
3378 message: workflow_event_sink_cancelled_message().to_string(),
3379 });
3380 }
3381
3382 loop {
3383 if event_sink_is_cancelled(event_sink) {
3384 return Err(YamlWorkflowRunError::EventSinkCancelled {
3385 message: workflow_event_sink_cancelled_message().to_string(),
3386 });
3387 }
3388
3389 let node =
3390 *node_map
3391 .get(current.as_str())
3392 .ok_or_else(|| YamlWorkflowRunError::MissingNode {
3393 node_id: current.clone(),
3394 })?;
3395
3396 trace.push(node.id.clone());
3397 let step_started = Instant::now();
3398
3399 let mut node_span_context: Option<TraceContext> = None;
3400 let mut node_span = if telemetry_context.sampled {
3401 let (span_context, mut span) = tracer.start_span(
3402 "workflow.node.execute",
3403 SpanKind::Node,
3404 workflow_span_context.as_ref(),
3405 );
3406 node_span_context = Some(span_context);
3407 apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
3408 apply_trace_tenant_attributes(span.as_mut(), options);
3409 span.set_attribute("node_id", node.id.as_str());
3410 span.set_attribute("node_kind", node.kind_name());
3411 if node.kind_name() == "llm_call" {
3412 span.set_attribute("langfuse.observation.type", "generation");
3413 }
3414 Some(span)
3415 } else {
3416 None
3417 };
3418
3419 let node_streamable = node
3420 .node_type
3421 .llm_call
3422 .as_ref()
3423 .map(|llm| llm.stream.unwrap_or(false) && !llm.heal.unwrap_or(false));
3424 let workflow_elapsed_before_node_ms = started.elapsed().as_millis();
3425
3426 if let Some(sink) = event_sink {
3427 sink.emit(&YamlWorkflowEvent {
3428 event_type: "node_started".to_string(),
3429 node_id: Some(node.id.clone()),
3430 step_id: Some(node.id.clone()),
3431 node_kind: Some(node.kind_name().to_string()),
3432 streamable: node_streamable,
3433 message: if node_streamable == Some(false) {
3434 Some("Node is not streamable; status events only".to_string())
3435 } else {
3436 None
3437 },
3438 delta: None,
3439 token_kind: None,
3440 is_terminal_node_token: None,
3441 elapsed_ms: Some(workflow_elapsed_before_node_ms),
3442 metadata: None,
3443 });
3444 }
3445
3446 if event_sink_is_cancelled(event_sink) {
3447 return Err(YamlWorkflowRunError::EventSinkCancelled {
3448 message: workflow_event_sink_cancelled_message().to_string(),
3449 });
3450 }
3451
3452 let mut node_usage: Option<YamlLlmTokenUsage> = None;
3453 let mut node_model_name: Option<String> = None;
3454 let is_terminal_node = !edge_map.contains_key(node.id.as_str());
3455 let next = if let Some(llm) = &node.node_type.llm_call {
3456 let prompt_template = node
3457 .config
3458 .as_ref()
3459 .and_then(|cfg| cfg.prompt.as_deref())
3460 .unwrap_or_default();
3461 let context = json!({
3462 "input": workflow_input,
3463 "nodes": outputs,
3464 "globals": Value::Object(globals.clone())
3465 });
3466 let messages = if let Some(path) = llm.messages_path.as_deref() {
3467 Some(
3468 parse_messages_from_context(path, &context).map_err(|message| {
3469 YamlWorkflowRunError::Llm {
3470 node_id: node.id.clone(),
3471 message,
3472 }
3473 })?,
3474 )
3475 } else {
3476 None
3477 };
3478 let prompt_bindings = collect_template_bindings(prompt_template, &context);
3479 let prompt = interpolate_template(prompt_template, &context);
3480 let schema = llm_output_schema_for_node(node);
3481
3482 let request = YamlLlmExecutionRequest {
3483 node_id: node.id.clone(),
3484 is_terminal_node,
3485 stream_json_as_text: llm.stream_json_as_text.unwrap_or(false),
3486 model: resolve_requested_model(options.model.as_deref(), &llm.model),
3487 messages,
3488 append_prompt_as_user: llm.append_prompt_as_user.unwrap_or(true),
3489 prompt,
3490 prompt_template: prompt_template.to_string(),
3491 prompt_bindings,
3492 schema,
3493 stream: llm.stream.unwrap_or(false),
3494 heal: llm.heal.unwrap_or(false),
3495 tools: normalize_llm_tools(llm).map_err(|message| YamlWorkflowRunError::Llm {
3496 node_id: node.id.clone(),
3497 message,
3498 })?,
3499 tool_choice: normalize_tool_choice(llm.tool_choice.clone()).map_err(|message| {
3500 YamlWorkflowRunError::Llm {
3501 node_id: node.id.clone(),
3502 message,
3503 }
3504 })?,
3505 max_tool_roundtrips: llm.max_tool_roundtrips.unwrap_or(1),
3506 tool_calls_global_key: llm.tool_calls_global_key.clone(),
3507 tool_trace_mode: options.telemetry.tool_trace_mode,
3508 execution_context: context.clone(),
3509 email_text: email_text.to_string(),
3510 trace_id: telemetry_context.trace_id.clone(),
3511 trace_context: node_span_context.clone(),
3512 tenant_context: options.trace.tenant.clone(),
3513 trace_sampled: telemetry_context.sampled,
3514 };
3515
3516 if let Some(span) = node_span.as_mut() {
3517 let node_input = payload_for_span(options.telemetry.payload_mode, &context);
3518 span.set_attribute("node_input", node_input.as_str());
3519 span.set_attribute("langfuse.observation.input", node_input.as_str());
3520 }
3521
3522 if let Some(sink) = event_sink {
3523 sink.emit(&YamlWorkflowEvent {
3524 event_type: "node_llm_input_resolved".to_string(),
3525 node_id: Some(node.id.clone()),
3526 step_id: Some(node.id.clone()),
3527 node_kind: Some("llm_call".to_string()),
3528 streamable: Some(request.stream),
3529 message: Some("resolved llm input for telemetry".to_string()),
3530 delta: None,
3531 token_kind: None,
3532 is_terminal_node_token: None,
3533 elapsed_ms: Some(started.elapsed().as_millis()),
3534 metadata: Some(json!({
3535 "model": request.model.clone(),
3536 "stream_requested": request.stream,
3537 "stream_json_as_text": request.stream_json_as_text,
3538 "heal_requested": request.heal,
3539 "effective_stream": request.stream,
3540 "prompt_template": request.prompt_template.clone(),
3541 "prompt": request.prompt.clone(),
3542 "schema": request.schema.clone(),
3543 "bindings": request.prompt_bindings.clone(),
3544 "tools_count": request.tools.len(),
3545 "max_tool_roundtrips": request.max_tool_roundtrips,
3546 })),
3547 });
3548 }
3549
3550 node_model_name = Some(request.model.clone());
3551 llm_node_models.insert(node.id.clone(), request.model.clone());
3552
3553 if event_sink_is_cancelled(event_sink) {
3554 return Err(YamlWorkflowRunError::EventSinkCancelled {
3555 message: workflow_event_sink_cancelled_message().to_string(),
3556 });
3557 }
3558
3559 let llm_result = executor
3560 .complete_structured(request, event_sink)
3561 .await
3562 .map_err(|message| YamlWorkflowRunError::Llm {
3563 node_id: node.id.clone(),
3564 message,
3565 })?;
3566
3567 if let Some(usage) = llm_result.usage.as_ref() {
3568 token_totals.add_usage(usage);
3569 }
3570 if workflow_ttft_ms.is_none() {
3571 workflow_ttft_ms = llm_result
3572 .ttft_ms
3573 .map(|node_ttft_ms| workflow_elapsed_before_node_ms + node_ttft_ms);
3574 }
3575 node_usage = llm_result.usage;
3576
3577 let payload = llm_result.payload;
3578 let tool_calls = llm_result.tool_calls;
3579
3580 let mut node_output = json!({ "output": payload });
3581 if !tool_calls.is_empty() {
3582 if let Some(output_obj) = node_output.as_object_mut() {
3583 output_obj.insert("tool_calls".to_string(), json!(tool_calls));
3584 }
3585 }
3586 outputs.insert(node.id.clone(), node_output);
3587 if let Some(span) = node_span.as_mut() {
3588 if let Some(output_payload) = outputs.get(node.id.as_str()) {
3589 let node_output =
3590 payload_for_span(options.telemetry.payload_mode, output_payload);
3591 span.set_attribute("node_output", node_output.as_str());
3592 span.set_attribute("langfuse.observation.output", node_output.as_str());
3593 }
3594 }
3595 apply_set_globals(node, &outputs, workflow_input, &mut globals);
3596 apply_update_globals(node, &outputs, workflow_input, &mut globals);
3597 if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
3598 if let Some(node_tool_calls) = outputs
3599 .get(node.id.as_str())
3600 .and_then(|value| value.get("tool_calls"))
3601 .cloned()
3602 {
3603 globals.insert(global_key.clone(), node_tool_calls);
3604 }
3605 }
3606 edge_map
3607 .get(node.id.as_str())
3608 .map(|value| value.to_string())
3609 } else if let Some(switch) = &node.node_type.switch {
3610 let context = json!({
3611 "input": workflow_input,
3612 "nodes": outputs,
3613 "globals": Value::Object(globals.clone())
3614 });
3615 let mut chosen = Some(switch.default.clone());
3616 for branch in &switch.branches {
3617 if evaluate_switch_condition(branch.condition.as_str(), &context)? {
3618 chosen = Some(branch.target.clone());
3619 break;
3620 }
3621 }
3622 let chosen = chosen.ok_or_else(|| YamlWorkflowRunError::InvalidSwitchTarget {
3623 node_id: node.id.clone(),
3624 })?;
3625 Some(chosen)
3626 } else if let Some(custom) = &node.node_type.custom_worker {
3627 let payload = node
3628 .config
3629 .as_ref()
3630 .and_then(|cfg| cfg.payload.as_ref())
3631 .cloned()
3632 .unwrap_or_else(|| json!({}));
3633 let context = json!({
3634 "input": workflow_input,
3635 "nodes": outputs,
3636 "globals": Value::Object(globals.clone())
3637 });
3638
3639 if let Some(span) = node_span.as_mut() {
3640 span.set_attribute("handler_name", custom.handler.as_str());
3641 let node_input = payload_for_span(options.telemetry.payload_mode, &payload);
3642 span.set_attribute("node_input", node_input.as_str());
3643 span.set_attribute("langfuse.observation.input", node_input.as_str());
3644 }
3645
3646 let mut handler_span_context: Option<TraceContext> = None;
3647 let mut handler_span = if telemetry_context.sampled {
3648 let (span_context, mut span) = tracer.start_span(
3649 "handler.invoke",
3650 SpanKind::Node,
3651 workflow_span_context.as_ref(),
3652 );
3653 handler_span_context = Some(span_context);
3654 apply_trace_identity_attributes(
3655 span.as_mut(),
3656 telemetry_context.trace_id.as_deref(),
3657 );
3658 span.set_attribute("handler_name", custom.handler.as_str());
3659 apply_trace_tenant_attributes(span.as_mut(), options);
3660 Some(span)
3661 } else {
3662 None
3663 };
3664
3665 let worker_trace_context = merged_trace_context_for_worker(
3666 handler_span_context.as_ref(),
3667 telemetry_context.trace_id.as_deref(),
3668 options,
3669 );
3670 let worker_context = custom_worker_context_with_trace(
3671 &context,
3672 &worker_trace_context,
3673 &options.trace.tenant,
3674 );
3675
3676 let worker_output_result = if let Some(custom_worker_executor) = custom_worker {
3677 custom_worker_executor
3678 .execute(
3679 custom.handler.as_str(),
3680 &payload,
3681 email_text,
3682 &worker_context,
3683 )
3684 .await
3685 .map_err(|message| YamlWorkflowRunError::CustomWorker {
3686 node_id: node.id.clone(),
3687 message,
3688 })
3689 } else {
3690 mock_custom_worker_output(custom.handler.as_str(), &payload)
3691 };
3692
3693 if let Some(span) = handler_span.take() {
3694 span.end();
3695 }
3696
3697 let worker_output = worker_output_result?;
3698
3699 outputs.insert(node.id.clone(), json!({ "output": worker_output }));
3700 if let Some(span) = node_span.as_mut() {
3701 if let Some(output_payload) = outputs.get(node.id.as_str()) {
3702 let node_output =
3703 payload_for_span(options.telemetry.payload_mode, output_payload);
3704 span.set_attribute("node_output", node_output.as_str());
3705 span.set_attribute("langfuse.observation.output", node_output.as_str());
3706 }
3707 }
3708 apply_set_globals(node, &outputs, workflow_input, &mut globals);
3709 apply_update_globals(node, &outputs, workflow_input, &mut globals);
3710 edge_map
3711 .get(node.id.as_str())
3712 .map(|value| value.to_string())
3713 } else {
3714 return Err(YamlWorkflowRunError::UnsupportedNodeType {
3715 node_id: node.id.clone(),
3716 });
3717 };
3718
3719 let node_kind = node.kind_name().to_string();
3720 let elapsed_ms = step_started.elapsed().as_millis();
3721 step_timings.push(YamlStepTiming {
3722 node_id: node.id.clone(),
3723 node_kind,
3724 model_name: node_model_name.clone(),
3725 elapsed_ms,
3726 prompt_tokens: node_usage.as_ref().map(|usage| usage.prompt_tokens),
3727 completion_tokens: node_usage.as_ref().map(|usage| usage.completion_tokens),
3728 total_tokens: node_usage.as_ref().map(|usage| usage.total_tokens),
3729 reasoning_tokens: node_usage.as_ref().and_then(|usage| usage.reasoning_tokens),
3730 tokens_per_second: node_usage
3731 .as_ref()
3732 .map(|usage| completion_tokens_per_second(usage.completion_tokens, elapsed_ms)),
3733 });
3734
3735 if let Some(usage) = node_usage.as_ref() {
3736 llm_node_metrics.insert(
3737 node.id.clone(),
3738 YamlLlmNodeMetrics {
3739 elapsed_ms,
3740 prompt_tokens: usage.prompt_tokens,
3741 completion_tokens: usage.completion_tokens,
3742 total_tokens: usage.total_tokens,
3743 reasoning_tokens: usage.reasoning_tokens,
3744 tokens_per_second: completion_tokens_per_second(
3745 usage.completion_tokens,
3746 elapsed_ms,
3747 ),
3748 },
3749 );
3750 }
3751
3752 if let Some(mut span) = node_span.take() {
3753 if let Some(model_name) = node_model_name.as_deref() {
3754 span.set_attribute("langfuse.observation.model.name", model_name);
3755 span.set_attribute("gen_ai.request.model", model_name);
3756 }
3757 if let Some(usage) = node_usage.as_ref() {
3758 apply_langfuse_observation_usage_attributes(span.as_mut(), usage);
3759 }
3760 span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
3761 span.add_event("node_completed");
3762 span.end();
3763 }
3764
3765 if let Some(sink) = event_sink {
3766 sink.emit(&YamlWorkflowEvent {
3767 event_type: "node_completed".to_string(),
3768 node_id: Some(node.id.clone()),
3769 step_id: Some(node.id.clone()),
3770 node_kind: Some(node.kind_name().to_string()),
3771 streamable: node_streamable,
3772 message: None,
3773 delta: None,
3774 token_kind: None,
3775 is_terminal_node_token: None,
3776 elapsed_ms: Some(elapsed_ms),
3777 metadata: None,
3778 });
3779 }
3780
3781 if event_sink_is_cancelled(event_sink) {
3782 return Err(YamlWorkflowRunError::EventSinkCancelled {
3783 message: workflow_event_sink_cancelled_message().to_string(),
3784 });
3785 }
3786
3787 if let Some(next) = next {
3788 current = next;
3789 continue;
3790 }
3791 break;
3792 }
3793
3794 let terminal_node = trace
3795 .last()
3796 .cloned()
3797 .ok_or_else(|| YamlWorkflowRunError::EmptyNodes {
3798 workflow_id: workflow.id.clone(),
3799 })?;
3800
3801 let terminal_output = outputs
3802 .get(terminal_node.as_str())
3803 .and_then(|value| value.get("output"))
3804 .cloned();
3805
3806 let total_elapsed_ms = started.elapsed().as_millis();
3807 let output = YamlWorkflowRunOutput {
3808 workflow_id: workflow.id.clone(),
3809 entry_node: workflow.entry_node.clone(),
3810 email_text: email_text.to_string(),
3811 trace,
3812 outputs,
3813 terminal_node,
3814 terminal_output,
3815 step_timings,
3816 llm_node_metrics,
3817 llm_node_models,
3818 total_elapsed_ms,
3819 ttft_ms: workflow_ttft_ms,
3820 total_input_tokens: token_totals.input_tokens,
3821 total_output_tokens: token_totals.output_tokens,
3822 total_tokens: token_totals.total_tokens,
3823 total_reasoning_tokens: token_totals.reasoning_tokens,
3824 tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
3825 trace_id: telemetry_context.trace_id.clone(),
3826 metadata: telemetry_context.trace_id.as_ref().map(|value| {
3827 workflow_metadata_with_trace(
3828 options,
3829 value,
3830 telemetry_context.sampled,
3831 telemetry_context.trace_id_source,
3832 )
3833 }),
3834 };
3835
3836 if let Some(sink) = event_sink {
3837 let event_metadata = if options.telemetry.nerdstats {
3838 Some(json!({
3839 "nerdstats": workflow_nerdstats(&output),
3840 }))
3841 } else {
3842 None
3843 };
3844 sink.emit(&YamlWorkflowEvent {
3845 event_type: "workflow_completed".to_string(),
3846 node_id: None,
3847 step_id: None,
3848 node_kind: None,
3849 streamable: None,
3850 message: Some(format!("terminal_node={}", output.terminal_node)),
3851 delta: None,
3852 token_kind: None,
3853 is_terminal_node_token: None,
3854 elapsed_ms: Some(output.total_elapsed_ms),
3855 metadata: event_metadata,
3856 });
3857 }
3858
3859 if event_sink_is_cancelled(event_sink) {
3860 return Err(YamlWorkflowRunError::EventSinkCancelled {
3861 message: workflow_event_sink_cancelled_message().to_string(),
3862 });
3863 }
3864
3865 if let Some(mut span) = workflow_span.take() {
3866 span.set_attribute("workflow_id", workflow.id.as_str());
3867 apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
3868 apply_langfuse_trace_input_output_attributes(
3869 span.as_mut(),
3870 workflow_input,
3871 &output,
3872 options.telemetry.payload_mode,
3873 );
3874 apply_langfuse_nerdstats_attributes(span.as_mut(), &output, options.telemetry.nerdstats);
3875 span.end();
3876 flush_workflow_tracer();
3877 }
3878
3879 Ok(output)
3880}
3881
3882async fn try_run_yaml_via_ir_runtime(
3883 workflow: &YamlWorkflow,
3884 workflow_input: &Value,
3885 executor: &dyn YamlWorkflowLlmExecutor,
3886 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3887 options: &YamlWorkflowRunOptions,
3888) -> Result<Option<YamlWorkflowRunOutput>, YamlWorkflowRunError> {
3889 let ir = match yaml_workflow_to_ir(workflow) {
3890 Ok(def) => def,
3891 Err(YamlToIrError::UnsupportedNode { .. })
3892 | Err(YamlToIrError::MultipleOutgoingEdge { .. }) => return Ok(None),
3893 Err(err) => {
3894 return Err(YamlWorkflowRunError::InvalidInput {
3895 message: err.to_string(),
3896 });
3897 }
3898 };
3899
3900 if validate_and_normalize(&ir).is_err() {
3901 return Ok(None);
3902 }
3903
3904 let parent_trace_context = trace_context_from_options(options);
3905 let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
3906
3907 let tracer = workflow_tracer();
3908 let mut workflow_span_context: Option<TraceContext> = None;
3909 let mut workflow_span = if telemetry_context.sampled {
3910 let (span_context, mut span) = tracer.start_span(
3911 "workflow.run",
3912 SpanKind::Workflow,
3913 parent_trace_context.as_ref(),
3914 );
3915 apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
3916 apply_trace_tenant_attributes(span.as_mut(), options);
3917 workflow_span_context = Some(span_context);
3918 Some(span)
3919 } else {
3920 None
3921 };
3922
3923 struct NoopLlm;
3924 #[async_trait]
3925 impl LlmExecutor for NoopLlm {
3926 async fn execute(
3927 &self,
3928 _input: LlmExecutionInput,
3929 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
3930 Err(LlmExecutionError::UnexpectedOutcome(
3931 "yaml_ir_uses_tool_path",
3932 ))
3933 }
3934 }
3935
3936 struct YamlIrToolExecutor<'a> {
3937 llm_executor: &'a dyn YamlWorkflowLlmExecutor,
3938 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
3939 token_totals: std::sync::Mutex<YamlTokenTotals>,
3940 node_usage: std::sync::Mutex<BTreeMap<String, YamlLlmTokenUsage>>,
3941 node_models: std::sync::Mutex<BTreeMap<String, String>>,
3942 model_override: Option<String>,
3943 trace_id: Option<String>,
3944 trace_context: Option<TraceContext>,
3945 trace_input_context: Option<YamlWorkflowTraceContextInput>,
3946 tenant_context: YamlWorkflowTraceTenantContext,
3947 payload_mode: YamlWorkflowPayloadMode,
3948 trace_sampled: bool,
3949 }
3950
3951 #[async_trait]
3952 impl ToolExecutor for YamlIrToolExecutor<'_> {
3953 async fn execute_tool(
3954 &self,
3955 input: ToolExecutionInput,
3956 ) -> Result<Value, ToolExecutionError> {
3957 let context = build_yaml_context_from_ir_scope(&input.scoped_input);
3958
3959 if input.tool == YAML_LLM_TOOL_ID {
3960 let node_id = input
3961 .input
3962 .get("node_id")
3963 .and_then(Value::as_str)
3964 .ok_or_else(|| {
3965 ToolExecutionError::Failed("yaml llm call missing node_id".to_string())
3966 })?
3967 .to_string();
3968 let node_id_for_metrics = node_id.clone();
3969 let model = input
3970 .input
3971 .get("model")
3972 .and_then(Value::as_str)
3973 .ok_or_else(|| {
3974 ToolExecutionError::Failed("yaml llm call missing model".to_string())
3975 })?
3976 .to_string();
3977 let resolved_model =
3978 resolve_requested_model(self.model_override.as_deref(), &model);
3979 let prompt_template = input
3980 .input
3981 .get("prompt_template")
3982 .and_then(Value::as_str)
3983 .unwrap_or_default()
3984 .to_string();
3985 let stream = input
3986 .input
3987 .get("stream")
3988 .and_then(Value::as_bool)
3989 .unwrap_or(false);
3990 let heal = input
3991 .input
3992 .get("heal")
3993 .and_then(Value::as_bool)
3994 .unwrap_or(false);
3995 let append_prompt_as_user = input
3996 .input
3997 .get("append_prompt_as_user")
3998 .and_then(Value::as_bool)
3999 .unwrap_or(true);
4000 let messages_path = input
4001 .input
4002 .get("messages_path")
4003 .and_then(Value::as_str)
4004 .map(str::to_string);
4005
4006 let messages = if let Some(path) = messages_path.as_deref() {
4007 Some(
4008 parse_messages_from_context(path, &context)
4009 .map_err(ToolExecutionError::Failed)?,
4010 )
4011 } else {
4012 None
4013 };
4014
4015 let prompt_bindings = collect_template_bindings(&prompt_template, &context);
4016 let prompt = interpolate_template(&prompt_template, &context);
4017 let email_text = context
4018 .get("input")
4019 .and_then(|v| v.get("email_text"))
4020 .and_then(Value::as_str)
4021 .unwrap_or_default();
4022 let schema = input
4023 .input
4024 .get("output_schema")
4025 .cloned()
4026 .unwrap_or_else(default_llm_output_schema);
4027
4028 let request = YamlLlmExecutionRequest {
4029 node_id,
4030 is_terminal_node: false,
4031 stream_json_as_text: input
4032 .input
4033 .get("stream_json_as_text")
4034 .and_then(Value::as_bool)
4035 .unwrap_or(false),
4036 model: resolved_model.clone(),
4037 messages,
4038 append_prompt_as_user,
4039 prompt,
4040 prompt_template,
4041 prompt_bindings,
4042 schema,
4043 stream,
4044 heal,
4045 tools: Vec::new(),
4046 tool_choice: None,
4047 max_tool_roundtrips: 1,
4048 tool_calls_global_key: None,
4049 tool_trace_mode: YamlToolTraceMode::Off,
4050 execution_context: context.clone(),
4051 email_text: email_text.to_string(),
4052 trace_id: self.trace_id.clone(),
4053 trace_context: self.trace_context.clone(),
4054 tenant_context: self.tenant_context.clone(),
4055 trace_sampled: self.trace_sampled,
4056 };
4057
4058 let llm_result = self
4059 .llm_executor
4060 .complete_structured(request, None)
4061 .await
4062 .map_err(ToolExecutionError::Failed);
4063
4064 if let Ok(ref result) = llm_result {
4065 if let Some(usage) = result.usage.as_ref() {
4066 if let Ok(mut totals) = self.token_totals.lock() {
4067 totals.add_usage(usage);
4068 }
4069 if let Ok(mut usage_map) = self.node_usage.lock() {
4070 usage_map.insert(node_id_for_metrics.clone(), usage.clone());
4071 }
4072 }
4073 if let Ok(mut model_map) = self.node_models.lock() {
4074 model_map.insert(node_id_for_metrics, resolved_model);
4075 }
4076 }
4077
4078 return llm_result.map(|result| result.payload);
4079 }
4080
4081 let worker = self
4082 .custom_worker
4083 .ok_or_else(|| ToolExecutionError::NotFound {
4084 tool: input.tool.clone(),
4085 })?;
4086
4087 let payload = input.input.clone();
4088 let email_text = context
4089 .get("input")
4090 .and_then(|v| v.get("email_text"))
4091 .and_then(Value::as_str)
4092 .unwrap_or_default();
4093
4094 let tracer = workflow_tracer();
4095 let mut handler_span_context: Option<TraceContext> = None;
4096 let mut handler_span = if self.trace_sampled {
4097 let (span_context, mut span) = tracer.start_span(
4098 "handler.invoke",
4099 SpanKind::Node,
4100 self.trace_context.as_ref(),
4101 );
4102 handler_span_context = Some(span_context);
4103 apply_trace_identity_attributes(span.as_mut(), self.trace_id.as_deref());
4104 span.set_attribute("handler_name", input.tool.as_str());
4105 apply_trace_tenant_attributes_from_tenant(span.as_mut(), &self.tenant_context);
4106 span.set_attribute(
4107 "node_input",
4108 payload_for_span(self.payload_mode, &payload).as_str(),
4109 );
4110 Some(span)
4111 } else {
4112 None
4113 };
4114
4115 let trace_options = YamlWorkflowRunOptions {
4116 telemetry: YamlWorkflowTelemetryConfig::default(),
4117 trace: YamlWorkflowTraceOptions {
4118 context: self.trace_input_context.clone(),
4119 tenant: self.tenant_context.clone(),
4120 },
4121 model: None,
4122 };
4123 let worker_trace_context = merged_trace_context_for_worker(
4124 handler_span_context.as_ref(),
4125 self.trace_id.as_deref(),
4126 &trace_options,
4127 );
4128 let worker_context = custom_worker_context_with_trace(
4129 &context,
4130 &worker_trace_context,
4131 &self.tenant_context,
4132 );
4133
4134 let output_result = worker
4135 .execute(&input.tool, &payload, email_text, &worker_context)
4136 .await
4137 .map_err(ToolExecutionError::Failed);
4138
4139 if let Some(span) = handler_span.as_mut() {
4140 if output_result.is_ok() {
4141 span.add_event("handler.success");
4142 } else {
4143 span.add_event("handler.error");
4144 }
4145 }
4146
4147 if let Some(span) = handler_span.take() {
4148 span.end();
4149 }
4150
4151 output_result
4152 }
4153 }
4154
4155 let tool_executor = YamlIrToolExecutor {
4156 llm_executor: executor,
4157 custom_worker,
4158 token_totals: std::sync::Mutex::new(YamlTokenTotals::default()),
4159 node_usage: std::sync::Mutex::new(BTreeMap::new()),
4160 node_models: std::sync::Mutex::new(BTreeMap::new()),
4161 model_override: options.model.clone(),
4162 trace_id: telemetry_context.trace_id.clone(),
4163 trace_context: workflow_span_context.clone(),
4164 trace_input_context: options.trace.context.clone(),
4165 tenant_context: options.trace.tenant.clone(),
4166 payload_mode: options.telemetry.payload_mode,
4167 trace_sampled: telemetry_context.sampled,
4168 };
4169
4170 let runtime_options = WorkflowRuntimeOptions {
4171 validate_before_run: false,
4172 ..WorkflowRuntimeOptions::default()
4173 };
4174 let runtime = WorkflowRuntime::new(ir, &NoopLlm, Some(&tool_executor), runtime_options);
4175
4176 let started = Instant::now();
4177 let result = match runtime.execute(workflow_input.clone(), None).await {
4178 Ok(result) => result,
4179 Err(WorkflowRuntimeError::Validation(_)) => return Ok(None),
4180 Err(error) => {
4181 return Err(YamlWorkflowRunError::IrRuntime {
4182 message: error.to_string(),
4183 });
4184 }
4185 };
4186 let total_elapsed_ms = started.elapsed().as_millis();
4187
4188 let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
4189 for (node_id, output) in result.node_outputs {
4190 if node_id == YAML_START_NODE_ID {
4191 continue;
4192 }
4193 outputs.insert(node_id, json!({"output": output}));
4194 }
4195
4196 let mut trace = Vec::new();
4197 let mut step_timings = Vec::new();
4198 let node_usage_map = tool_executor
4199 .node_usage
4200 .lock()
4201 .map(|usage| usage.clone())
4202 .unwrap_or_default();
4203 let llm_node_models = tool_executor
4204 .node_models
4205 .lock()
4206 .map(|models| models.clone())
4207 .unwrap_or_default();
4208 let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
4209 for execution in result.node_executions {
4210 if execution.node_id == YAML_START_NODE_ID {
4211 continue;
4212 }
4213 let node_id = execution.node_id;
4214 trace.push(node_id.clone());
4215 let usage = node_usage_map.get(&node_id);
4216 if let Some(usage) = usage {
4217 llm_node_metrics.insert(
4218 node_id.clone(),
4219 YamlLlmNodeMetrics {
4220 elapsed_ms: 0,
4221 prompt_tokens: usage.prompt_tokens,
4222 completion_tokens: usage.completion_tokens,
4223 total_tokens: usage.total_tokens,
4224 reasoning_tokens: usage.reasoning_tokens,
4225 tokens_per_second: completion_tokens_per_second(usage.completion_tokens, 0),
4226 },
4227 );
4228 }
4229 step_timings.push(YamlStepTiming {
4230 node_id: node_id.clone(),
4231 node_kind: "ir_runtime".to_string(),
4232 model_name: llm_node_models.get(&node_id).cloned(),
4233 elapsed_ms: 0,
4234 prompt_tokens: usage.map(|value| value.prompt_tokens),
4235 completion_tokens: usage.map(|value| value.completion_tokens),
4236 total_tokens: usage.map(|value| value.total_tokens),
4237 reasoning_tokens: usage.and_then(|value| value.reasoning_tokens),
4238 tokens_per_second: usage
4239 .map(|value| completion_tokens_per_second(value.completion_tokens, 0)),
4240 });
4241 }
4242
4243 let terminal_node = result.terminal_node_id;
4244 let terminal_output = outputs
4245 .get(&terminal_node)
4246 .and_then(|v| v.get("output"))
4247 .cloned();
4248
4249 let email_text = workflow_input
4250 .get("email_text")
4251 .and_then(Value::as_str)
4252 .unwrap_or_default()
4253 .to_string();
4254
4255 let token_totals = tool_executor
4256 .token_totals
4257 .lock()
4258 .map(|totals| totals.clone())
4259 .unwrap_or_default();
4260
4261 let output = YamlWorkflowRunOutput {
4262 workflow_id: workflow.id.clone(),
4263 entry_node: workflow.entry_node.clone(),
4264 email_text,
4265 trace,
4266 outputs,
4267 terminal_node,
4268 terminal_output,
4269 step_timings,
4270 llm_node_metrics,
4271 llm_node_models,
4272 total_elapsed_ms,
4273 ttft_ms: None,
4274 total_input_tokens: token_totals.input_tokens,
4275 total_output_tokens: token_totals.output_tokens,
4276 total_tokens: token_totals.total_tokens,
4277 total_reasoning_tokens: token_totals.reasoning_tokens,
4278 tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
4279 trace_id: telemetry_context.trace_id.clone(),
4280 metadata: telemetry_context.trace_id.as_ref().map(|value| {
4281 workflow_metadata_with_trace(
4282 options,
4283 value,
4284 telemetry_context.sampled,
4285 telemetry_context.trace_id_source,
4286 )
4287 }),
4288 };
4289
4290 if let Some(mut span) = workflow_span.take() {
4291 span.set_attribute("workflow_id", workflow.id.as_str());
4292 apply_trace_identity_attributes(span.as_mut(), telemetry_context.trace_id.as_deref());
4293 apply_langfuse_trace_input_output_attributes(
4294 span.as_mut(),
4295 workflow_input,
4296 &output,
4297 options.telemetry.payload_mode,
4298 );
4299 apply_langfuse_nerdstats_attributes(span.as_mut(), &output, options.telemetry.nerdstats);
4300 span.end();
4301 flush_workflow_tracer();
4302 }
4303
4304 Ok(Some(output))
4305}
4306
4307fn build_yaml_context_from_ir_scope(scoped_input: &Value) -> Value {
4308 let input = scoped_input.get("input").cloned().unwrap_or(Value::Null);
4309
4310 let mut nodes = serde_json::Map::new();
4311 if let Some(node_outputs) = scoped_input.get("node_outputs").and_then(Value::as_object) {
4312 for (node_id, output) in node_outputs {
4313 nodes.insert(node_id.clone(), json!({"output": output.clone()}));
4314 }
4315 }
4316
4317 json!({
4318 "input": input,
4319 "nodes": Value::Object(nodes),
4320 "globals": Value::Object(serde_json::Map::new())
4321 })
4322}
4323
4324pub async fn run_email_workflow_yaml_with_custom_worker_and_events(
4325 workflow: &YamlWorkflow,
4326 email_text: &str,
4327 executor: &dyn YamlWorkflowLlmExecutor,
4328 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
4329 event_sink: Option<&dyn YamlWorkflowEventSink>,
4330) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
4331 let workflow_input = json!({ "email_text": email_text });
4332 run_workflow_yaml_with_custom_worker_and_events(
4333 workflow,
4334 &workflow_input,
4335 executor,
4336 custom_worker,
4337 event_sink,
4338 )
4339 .await
4340}
4341
4342fn evaluate_switch_condition(
4343 condition: &str,
4344 context: &Value,
4345) -> Result<bool, YamlWorkflowRunError> {
4346 let (left, right) =
4347 condition
4348 .split_once("==")
4349 .ok_or_else(|| YamlWorkflowRunError::UnsupportedCondition {
4350 condition: condition.to_string(),
4351 })?;
4352
4353 let left_path = left.trim().trim_start_matches("$.");
4354 let right_literal = right.trim().trim_matches('"').trim_matches('\'');
4355 let left_value = resolve_path(context, left_path);
4356 Ok(left_value
4357 .and_then(Value::as_str)
4358 .map(|value| value == right_literal)
4359 .unwrap_or(false))
4360}
4361
4362fn parse_messages_from_context(path: &str, context: &Value) -> Result<Vec<Message>, String> {
4363 let normalized_path = path.trim().trim_start_matches("$.");
4364 let value = resolve_path(context, normalized_path)
4365 .ok_or_else(|| format!("messages_path not found: {path}"))?;
4366 let list: Vec<WorkflowMessage> = serde_json::from_value(value.clone()).map_err(|err| {
4367 format!("messages_path must resolve to a list of messages: {path}; {err}")
4368 })?;
4369 if list.is_empty() {
4370 return Err(format!(
4371 "messages_path must not resolve to an empty list: {path}"
4372 ));
4373 }
4374
4375 let mut messages = Vec::with_capacity(list.len());
4376 for (index, item) in list.into_iter().enumerate() {
4377 let mut message = match item.role {
4378 Role::System => Message::system(item.content),
4379 Role::User => Message::user(item.content),
4380 Role::Assistant => Message::assistant(item.content),
4381 Role::Tool => {
4382 let tool_call_id = item
4383 .tool_call_id
4384 .ok_or_else(|| format!("tool message at index {index} missing tool_call_id"))?;
4385 Message::tool(item.content, tool_call_id)
4386 }
4387 };
4388
4389 if let Some(name) = item.name {
4390 message = message.with_name(name);
4391 }
4392
4393 messages.push(message);
4394 }
4395
4396 Ok(messages)
4397}
4398
4399pub fn verify_yaml_workflow(workflow: &YamlWorkflow) -> Vec<YamlWorkflowDiagnostic> {
4400 let mut diagnostics = Vec::new();
4401 let known_ids: HashMap<&str, &YamlNode> = workflow
4402 .nodes
4403 .iter()
4404 .map(|node| (node.id.as_str(), node))
4405 .collect();
4406
4407 if !known_ids.contains_key(workflow.entry_node.as_str()) {
4408 diagnostics.push(YamlWorkflowDiagnostic {
4409 node_id: None,
4410 code: "missing_entry".to_string(),
4411 severity: YamlWorkflowDiagnosticSeverity::Error,
4412 message: format!("entry node '{}' does not exist", workflow.entry_node),
4413 });
4414 }
4415
4416 for edge in &workflow.edges {
4417 if !known_ids.contains_key(edge.from.as_str()) {
4418 diagnostics.push(YamlWorkflowDiagnostic {
4419 node_id: Some(edge.from.clone()),
4420 code: "unknown_edge_from".to_string(),
4421 severity: YamlWorkflowDiagnosticSeverity::Error,
4422 message: format!("edge.from '{}' does not exist", edge.from),
4423 });
4424 }
4425 if !known_ids.contains_key(edge.to.as_str()) {
4426 diagnostics.push(YamlWorkflowDiagnostic {
4427 node_id: Some(edge.to.clone()),
4428 code: "unknown_edge_to".to_string(),
4429 severity: YamlWorkflowDiagnosticSeverity::Error,
4430 message: format!("edge.to '{}' does not exist", edge.to),
4431 });
4432 }
4433 }
4434
4435 for node in &workflow.nodes {
4436 if let Some(llm) = &node.node_type.llm_call {
4437 if llm.model.trim().is_empty() {
4438 diagnostics.push(YamlWorkflowDiagnostic {
4439 node_id: Some(node.id.clone()),
4440 code: "empty_model".to_string(),
4441 severity: YamlWorkflowDiagnosticSeverity::Error,
4442 message: "llm_call.model must not be empty".to_string(),
4443 });
4444 }
4445 if llm.stream.unwrap_or(false) && llm.heal.unwrap_or(false) {
4446 diagnostics.push(YamlWorkflowDiagnostic {
4447 node_id: Some(node.id.clone()),
4448 code: "stream_heal_conflict".to_string(),
4449 severity: YamlWorkflowDiagnosticSeverity::Warning,
4450 message:
4451 "llm_call.stream=true with heal=true is not streamable; runtime will disable streaming"
4452 .to_string(),
4453 });
4454 }
4455
4456 if llm.max_tool_roundtrips.unwrap_or(1) == 0 {
4457 diagnostics.push(YamlWorkflowDiagnostic {
4458 node_id: Some(node.id.clone()),
4459 code: "invalid_max_tool_roundtrips".to_string(),
4460 severity: YamlWorkflowDiagnosticSeverity::Error,
4461 message: "llm_call.max_tool_roundtrips must be >= 1".to_string(),
4462 });
4463 }
4464
4465 if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
4466 if global_key.trim().is_empty() {
4467 diagnostics.push(YamlWorkflowDiagnostic {
4468 node_id: Some(node.id.clone()),
4469 code: "empty_tool_calls_global_key".to_string(),
4470 severity: YamlWorkflowDiagnosticSeverity::Error,
4471 message: "llm_call.tool_calls_global_key must not be empty".to_string(),
4472 });
4473 }
4474 }
4475
4476 match normalize_tool_choice(llm.tool_choice.clone()) {
4477 Ok(choice) => {
4478 if let Some(ToolChoice::Tool(choice_tool)) = choice.as_ref() {
4479 if !llm.tools.iter().any(|tool| match (llm.tools_format, tool) {
4480 (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
4481 openai.function.name == choice_tool.function.name
4482 }
4483 (
4484 YamlToolFormat::Simplified,
4485 YamlToolDeclaration::Simplified(simple),
4486 ) => simple.name == choice_tool.function.name,
4487 _ => false,
4488 }) {
4489 diagnostics.push(YamlWorkflowDiagnostic {
4490 node_id: Some(node.id.clone()),
4491 code: "unknown_tool_choice_function".to_string(),
4492 severity: YamlWorkflowDiagnosticSeverity::Error,
4493 message: format!(
4494 "llm_call.tool_choice references unknown function '{}'",
4495 choice_tool.function.name
4496 ),
4497 });
4498 }
4499 }
4500 }
4501 Err(message) => {
4502 diagnostics.push(YamlWorkflowDiagnostic {
4503 node_id: Some(node.id.clone()),
4504 code: "invalid_tool_choice".to_string(),
4505 severity: YamlWorkflowDiagnosticSeverity::Error,
4506 message,
4507 });
4508 }
4509 }
4510
4511 let normalized_tools = match normalize_llm_tools(llm) {
4512 Ok(tools) => tools,
4513 Err(message) => {
4514 diagnostics.push(YamlWorkflowDiagnostic {
4515 node_id: Some(node.id.clone()),
4516 code: "invalid_tools_format".to_string(),
4517 severity: YamlWorkflowDiagnosticSeverity::Error,
4518 message,
4519 });
4520 Vec::new()
4521 }
4522 };
4523
4524 let mut seen_tool_names = HashSet::new();
4525 for tool in &normalized_tools {
4526 let name = tool.definition.function.name.trim();
4527 if name.is_empty() {
4528 diagnostics.push(YamlWorkflowDiagnostic {
4529 node_id: Some(node.id.clone()),
4530 code: "empty_tool_name".to_string(),
4531 severity: YamlWorkflowDiagnosticSeverity::Error,
4532 message: "tool function name must not be empty".to_string(),
4533 });
4534 }
4535 if !seen_tool_names.insert(tool.definition.function.name.clone()) {
4536 diagnostics.push(YamlWorkflowDiagnostic {
4537 node_id: Some(node.id.clone()),
4538 code: "duplicate_tool_name".to_string(),
4539 severity: YamlWorkflowDiagnosticSeverity::Error,
4540 message: format!(
4541 "duplicate tool function name '{}' in node",
4542 tool.definition.function.name
4543 ),
4544 });
4545 }
4546
4547 let schema = tool
4548 .definition
4549 .function
4550 .parameters
4551 .clone()
4552 .unwrap_or(Value::Null);
4553 if schema.is_null() {
4554 diagnostics.push(YamlWorkflowDiagnostic {
4555 node_id: Some(node.id.clone()),
4556 code: "missing_tool_input_schema".to_string(),
4557 severity: YamlWorkflowDiagnosticSeverity::Error,
4558 message: format!(
4559 "tool '{}' is missing input schema",
4560 tool.definition.function.name
4561 ),
4562 });
4563 } else if let Err(message) = validate_json_schema(&schema) {
4564 diagnostics.push(YamlWorkflowDiagnostic {
4565 node_id: Some(node.id.clone()),
4566 code: "invalid_tool_input_schema".to_string(),
4567 severity: YamlWorkflowDiagnosticSeverity::Error,
4568 message: format!(
4569 "tool '{}' has invalid input schema: {}",
4570 tool.definition.function.name, message
4571 ),
4572 });
4573 }
4574
4575 if let Some(output_schema) = tool.output_schema.as_ref() {
4576 if let Err(message) = validate_json_schema(output_schema) {
4577 diagnostics.push(YamlWorkflowDiagnostic {
4578 node_id: Some(node.id.clone()),
4579 code: "invalid_tool_output_schema".to_string(),
4580 severity: YamlWorkflowDiagnosticSeverity::Error,
4581 message: format!(
4582 "tool '{}' has invalid output schema: {}",
4583 tool.definition.function.name, message
4584 ),
4585 });
4586 }
4587 }
4588 }
4589 }
4590
4591 if let Some(switch) = &node.node_type.switch {
4592 for branch in &switch.branches {
4593 if !known_ids.contains_key(branch.target.as_str()) {
4594 diagnostics.push(YamlWorkflowDiagnostic {
4595 node_id: Some(node.id.clone()),
4596 code: "unknown_switch_target".to_string(),
4597 severity: YamlWorkflowDiagnosticSeverity::Error,
4598 message: format!("switch branch target '{}' does not exist", branch.target),
4599 });
4600 }
4601 }
4602 if !known_ids.contains_key(switch.default.as_str()) {
4603 diagnostics.push(YamlWorkflowDiagnostic {
4604 node_id: Some(node.id.clone()),
4605 code: "unknown_switch_default".to_string(),
4606 severity: YamlWorkflowDiagnosticSeverity::Error,
4607 message: format!("switch default target '{}' does not exist", switch.default),
4608 });
4609 }
4610 }
4611
4612 if let Some(config) = node.config.as_ref() {
4613 if let Some(update_globals) = config.update_globals.as_ref() {
4614 for (key, update) in update_globals {
4615 let is_valid_op =
4616 matches!(update.op.as_str(), "set" | "append" | "increment" | "merge");
4617 if !is_valid_op {
4618 diagnostics.push(YamlWorkflowDiagnostic {
4619 node_id: Some(node.id.clone()),
4620 code: "unknown_update_op".to_string(),
4621 severity: YamlWorkflowDiagnosticSeverity::Error,
4622 message: format!(
4623 "update_globals key '{}' has unknown op '{}'; expected set|append|increment|merge",
4624 key, update.op
4625 ),
4626 });
4627 }
4628
4629 if update.op != "increment" && update.from.is_none() {
4630 diagnostics.push(YamlWorkflowDiagnostic {
4631 node_id: Some(node.id.clone()),
4632 code: "missing_update_from".to_string(),
4633 severity: YamlWorkflowDiagnosticSeverity::Error,
4634 message: format!(
4635 "update_globals key '{}' with op '{}' requires 'from'",
4636 key, update.op
4637 ),
4638 });
4639 }
4640 }
4641 }
4642 }
4643 }
4644
4645 diagnostics
4646}
4647
4648fn resolve_path<'a>(value: &'a Value, path: &str) -> Option<&'a Value> {
4649 path.split('.')
4650 .filter(|segment| !segment.is_empty())
4651 .try_fold(value, |current, segment| {
4652 if let Ok(index) = segment.parse::<usize>() {
4653 return current.get(index);
4654 }
4655 current.get(segment)
4656 })
4657}
4658
4659fn interpolate_template(template: &str, context: &Value) -> String {
4660 let mut out = String::with_capacity(template.len());
4661 let mut rest = template;
4662
4663 loop {
4664 let Some(start) = rest.find("{{") else {
4665 out.push_str(rest);
4666 break;
4667 };
4668
4669 out.push_str(&rest[..start]);
4670 let after_start = &rest[start + 2..];
4671 let Some(end) = after_start.find("}}") else {
4672 out.push_str(&rest[start..]);
4673 break;
4674 };
4675
4676 let expr = after_start[..end].trim();
4677 let source_path = expr.trim_start_matches("$.");
4678 let replacement = resolve_path(context, source_path)
4679 .map(value_to_template_string)
4680 .unwrap_or_default();
4681 out.push_str(replacement.as_str());
4682
4683 rest = &after_start[end + 2..];
4684 }
4685
4686 out
4687}
4688
4689fn collect_template_bindings(template: &str, context: &Value) -> Vec<YamlTemplateBinding> {
4690 let mut bindings = Vec::new();
4691 let mut rest = template;
4692
4693 loop {
4694 let Some(start) = rest.find("{{") else {
4695 break;
4696 };
4697
4698 let after_start = &rest[start + 2..];
4699 let Some(end) = after_start.find("}}") else {
4700 break;
4701 };
4702
4703 let expr = after_start[..end].trim();
4704 let source_path = expr.trim_start_matches("$.").to_string();
4705 let resolved = resolve_path(context, source_path.as_str()).cloned();
4706 let missing = resolved.is_none();
4707 let resolved_value = resolved.unwrap_or(Value::Null);
4708 bindings.push(YamlTemplateBinding {
4709 index: bindings.len(),
4710 expression: expr.to_string(),
4711 source_path,
4712 resolved_type: json_type_name(&resolved_value).to_string(),
4713 missing,
4714 resolved: resolved_value,
4715 });
4716
4717 rest = &after_start[end + 2..];
4718 }
4719
4720 bindings
4721}
4722
4723fn json_type_name(value: &Value) -> &'static str {
4724 match value {
4725 Value::Null => "null",
4726 Value::Bool(_) => "bool",
4727 Value::Number(_) => "number",
4728 Value::String(_) => "string",
4729 Value::Array(_) => "array",
4730 Value::Object(_) => "object",
4731 }
4732}
4733
4734fn value_to_template_string(value: &Value) -> String {
4735 match value {
4736 Value::Null => String::new(),
4737 Value::Bool(v) => v.to_string(),
4738 Value::Number(v) => v.to_string(),
4739 Value::String(v) => v.clone(),
4740 Value::Array(_) | Value::Object(_) => serde_json::to_string(value).unwrap_or_default(),
4741 }
4742}
4743
4744fn apply_set_globals(
4745 node: &YamlNode,
4746 outputs: &BTreeMap<String, Value>,
4747 workflow_input: &Value,
4748 globals: &mut serde_json::Map<String, Value>,
4749) {
4750 let Some(config) = node.config.as_ref() else {
4751 return;
4752 };
4753 let Some(set_globals) = config.set_globals.as_ref() else {
4754 return;
4755 };
4756
4757 let context = json!({
4758 "input": workflow_input,
4759 "nodes": outputs,
4760 "globals": Value::Object(globals.clone())
4761 });
4762
4763 for (key, expr) in set_globals {
4764 let value = resolve_path(&context, expr.as_str())
4765 .cloned()
4766 .unwrap_or(Value::Null);
4767 globals.insert(key.clone(), value);
4768 }
4769}
4770
4771fn apply_update_globals(
4772 node: &YamlNode,
4773 outputs: &BTreeMap<String, Value>,
4774 workflow_input: &Value,
4775 globals: &mut serde_json::Map<String, Value>,
4776) {
4777 let Some(config) = node.config.as_ref() else {
4778 return;
4779 };
4780 let Some(update_globals) = config.update_globals.as_ref() else {
4781 return;
4782 };
4783
4784 let context = json!({
4785 "input": workflow_input,
4786 "nodes": outputs,
4787 "globals": Value::Object(globals.clone())
4788 });
4789
4790 for (key, update) in update_globals {
4791 match update.op.as_str() {
4792 "set" => {
4793 if let Some(path) = update.from.as_ref() {
4794 let value = resolve_path(&context, path.as_str())
4795 .cloned()
4796 .unwrap_or(Value::Null);
4797 globals.insert(key.clone(), value);
4798 }
4799 }
4800 "append" => {
4801 if let Some(path) = update.from.as_ref() {
4802 let value = resolve_path(&context, path.as_str())
4803 .cloned()
4804 .unwrap_or(Value::Null);
4805 let entry = globals
4806 .entry(key.clone())
4807 .or_insert_with(|| Value::Array(Vec::new()));
4808 match entry {
4809 Value::Array(items) => items.push(value),
4810 other => {
4811 let existing = other.clone();
4812 *other = Value::Array(vec![existing, value]);
4813 }
4814 }
4815 }
4816 }
4817 "increment" => {
4818 let by = update.by.unwrap_or(1.0);
4819 let current = globals
4820 .get(key.as_str())
4821 .and_then(Value::as_f64)
4822 .unwrap_or(0.0);
4823 if let Some(next) = serde_json::Number::from_f64(current + by) {
4824 globals.insert(key.clone(), Value::Number(next));
4825 }
4826 }
4827 "merge" => {
4828 if let Some(path) = update.from.as_ref() {
4829 let source = resolve_path(&context, path.as_str())
4830 .cloned()
4831 .unwrap_or(Value::Null);
4832 if let Value::Object(source_map) = source {
4833 let target = globals
4834 .entry(key.clone())
4835 .or_insert_with(|| Value::Object(serde_json::Map::new()));
4836 if let Value::Object(target_map) = target {
4837 target_map.extend(source_map);
4838 } else {
4839 *target = Value::Object(source_map);
4840 }
4841 }
4842 }
4843 }
4844 _ => {}
4845 }
4846 }
4847}
4848
4849fn llm_output_schema_for_node(node: &YamlNode) -> Value {
4850 if let Some(schema) = node
4851 .config
4852 .as_ref()
4853 .and_then(|cfg| cfg.output_schema.clone())
4854 {
4855 return schema;
4856 }
4857
4858 default_llm_output_schema()
4859}
4860
4861fn normalize_tool_choice(
4862 config: Option<YamlToolChoiceConfig>,
4863) -> Result<Option<ToolChoice>, String> {
4864 let Some(config) = config else {
4865 return Ok(None);
4866 };
4867
4868 let choice = match config {
4869 YamlToolChoiceConfig::Mode(mode) => ToolChoice::Mode(mode),
4870 YamlToolChoiceConfig::Function { function } => ToolChoice::Tool(ToolChoiceTool {
4871 tool_type: ToolType::Function,
4872 function: ToolChoiceFunction { name: function },
4873 }),
4874 YamlToolChoiceConfig::OpenAi(tool) => ToolChoice::Tool(tool),
4875 };
4876
4877 Ok(Some(choice))
4878}
4879
4880fn normalize_llm_tools(llm: &YamlLlmCall) -> Result<Vec<YamlResolvedTool>, String> {
4881 llm.tools
4882 .iter()
4883 .map(|tool| match (llm.tools_format, tool) {
4884 (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
4885 let definition = ToolDefinition {
4886 tool_type: openai.tool_type.unwrap_or(ToolType::Function),
4887 function: ToolFunction {
4888 name: openai.function.name.clone(),
4889 description: openai.function.description.clone(),
4890 parameters: openai.function.parameters.clone(),
4891 },
4892 };
4893 Ok(YamlResolvedTool {
4894 definition,
4895 output_schema: openai.function.output_schema.clone(),
4896 })
4897 }
4898 (YamlToolFormat::Simplified, YamlToolDeclaration::Simplified(simple)) => {
4899 let definition = ToolDefinition {
4900 tool_type: ToolType::Function,
4901 function: ToolFunction {
4902 name: simple.name.clone(),
4903 description: simple.description.clone(),
4904 parameters: Some(simple.input_schema.clone()),
4905 },
4906 };
4907 Ok(YamlResolvedTool {
4908 definition,
4909 output_schema: simple.output_schema.clone(),
4910 })
4911 }
4912 (YamlToolFormat::Openai, _) => {
4913 Err("tools_format=openai requires OpenAI-style tool declarations".to_string())
4914 }
4915 (YamlToolFormat::Simplified, _) => {
4916 Err("tools_format=simplified requires simplified tool declarations".to_string())
4917 }
4918 })
4919 .collect()
4920}
4921
4922fn default_llm_output_schema() -> Value {
4923 json!({
4924 "type": "object",
4925 "additionalProperties": true
4926 })
4927}
4928
4929fn mock_rag(topic: &str) -> Value {
4930 let (kb_source, playbook) = match topic {
4931 "probation" => (
4932 "hr_policy/probation.md",
4933 "Collect manager review, performance evidence, and probation timeline.",
4934 ),
4935 "leave_request" => (
4936 "hr_policy/leave.md",
4937 "Validate leave balance, manager approval, and blackout dates.",
4938 ),
4939 "supply_chain_order_assessment" => (
4940 "supply_chain/order_assessment.md",
4941 "Review order specs, inventory risk, and vendor lead-time guidance.",
4942 ),
4943 "supply_chain_order_replacement" => (
4944 "supply_chain/order_replacement.md",
4945 "Collect order id, damage proof, and replacement SLA policy.",
4946 ),
4947 "termination_first_time_offense" => (
4948 "hr_policy/termination_first_offense.md",
4949 "Validate first-incident criteria and route to HRBP review.",
4950 ),
4951 "termination_repeated_offense" => (
4952 "hr_policy/termination_repeated_offense.md",
4953 "Collect prior warnings and escalation approvals before final action.",
4954 ),
4955 _ => (
4956 "shared/request_clarification.md",
4957 "Request clarifying details before routing.",
4958 ),
4959 };
4960
4961 json!({
4962 "kb_source": kb_source,
4963 "playbook": playbook,
4964 })
4965}
4966
4967async fn execute_subworkflow_tool_call(
4968 payload: &Value,
4969 context: &Value,
4970 client: &SimpleAgentsClient,
4971 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
4972 parent_options: &YamlWorkflowRunOptions,
4973 parent_trace_context: Option<&TraceContext>,
4974 resolved_trace_id: Option<&str>,
4975) -> Result<Value, String> {
4976 let workflow_id = payload
4977 .get("workflow_id")
4978 .and_then(Value::as_str)
4979 .ok_or_else(|| "run_workflow_graph requires payload.workflow_id".to_string())?;
4980
4981 let input_context = context
4982 .get("input")
4983 .and_then(Value::as_object)
4984 .ok_or_else(|| "run_workflow_graph requires context.input".to_string())?;
4985
4986 let registry = input_context
4987 .get("workflow_registry")
4988 .and_then(Value::as_object)
4989 .ok_or_else(|| {
4990 "run_workflow_graph requires input.workflow_registry map of workflow_id -> yaml_path"
4991 .to_string()
4992 })?;
4993
4994 let workflow_path = registry
4995 .get(workflow_id)
4996 .and_then(Value::as_str)
4997 .ok_or_else(|| {
4998 format!(
4999 "workflow_registry has no entry for workflow_id '{}'",
5000 workflow_id
5001 )
5002 })?;
5003
5004 let parent_depth = input_context
5005 .get("__subgraph_depth")
5006 .and_then(Value::as_u64)
5007 .unwrap_or(0);
5008 let max_depth = input_context
5009 .get("__subgraph_max_depth")
5010 .and_then(Value::as_u64)
5011 .unwrap_or(3);
5012
5013 if parent_depth >= max_depth {
5014 return Err(format!(
5015 "run_workflow_graph depth limit reached (depth={}, max={})",
5016 parent_depth, max_depth
5017 ));
5018 }
5019
5020 let mut subworkflow_input = payload
5021 .get("input")
5022 .and_then(Value::as_object)
5023 .cloned()
5024 .unwrap_or_default();
5025
5026 if !subworkflow_input.contains_key("messages") {
5027 if let Some(messages) = input_context.get("messages") {
5028 subworkflow_input.insert("messages".to_string(), messages.clone());
5029 }
5030 }
5031
5032 if !subworkflow_input.contains_key("email_text") {
5033 if let Some(email_text) = input_context.get("email_text") {
5034 subworkflow_input.insert("email_text".to_string(), email_text.clone());
5035 }
5036 }
5037
5038 if !subworkflow_input.contains_key("workflow_registry") {
5039 subworkflow_input.insert(
5040 "workflow_registry".to_string(),
5041 Value::Object(registry.clone()),
5042 );
5043 }
5044
5045 subworkflow_input.insert(
5046 "__subgraph_depth".to_string(),
5047 Value::Number(serde_json::Number::from(parent_depth + 1)),
5048 );
5049 subworkflow_input.insert(
5050 "__subgraph_max_depth".to_string(),
5051 Value::Number(serde_json::Number::from(max_depth)),
5052 );
5053
5054 let subworkflow_options =
5055 build_subworkflow_options(parent_options, parent_trace_context, resolved_trace_id);
5056
5057 let output = run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
5058 Path::new(workflow_path),
5059 &Value::Object(subworkflow_input),
5060 client,
5061 custom_worker,
5062 None,
5063 &subworkflow_options,
5064 )
5065 .await
5066 .map_err(|error| format!("subworkflow '{}' failed: {}", workflow_id, error))?;
5067
5068 Ok(json!({
5069 "workflow_id": workflow_id,
5070 "workflow_path": workflow_path,
5071 "terminal_node": output.terminal_node,
5072 "terminal_output": output.terminal_output,
5073 "trace": output.trace,
5074 }))
5075}
5076
5077fn build_subworkflow_options(
5078 parent_options: &YamlWorkflowRunOptions,
5079 parent_trace_context: Option<&TraceContext>,
5080 resolved_trace_id: Option<&str>,
5081) -> YamlWorkflowRunOptions {
5082 let mut subworkflow_options = parent_options.clone();
5083 if parent_trace_context.is_some() || resolved_trace_id.is_some() {
5084 let trace_context = YamlWorkflowTraceContextInput {
5085 trace_id: resolved_trace_id
5086 .map(|value| value.to_string())
5087 .or_else(|| parent_trace_context.and_then(|ctx| ctx.trace_id.clone())),
5088 span_id: parent_trace_context.and_then(|ctx| ctx.span_id.clone()),
5089 parent_span_id: parent_trace_context.and_then(|ctx| ctx.parent_span_id.clone()),
5090 traceparent: parent_trace_context.and_then(|ctx| ctx.traceparent.clone()),
5091 tracestate: parent_trace_context.and_then(|ctx| ctx.tracestate.clone()),
5092 baggage: parent_trace_context
5093 .map(|ctx| ctx.baggage.clone())
5094 .unwrap_or_default(),
5095 };
5096 subworkflow_options.trace.context = Some(trace_context);
5097 }
5098 subworkflow_options
5099}
5100
5101fn mock_custom_worker_output(
5102 handler: &str,
5103 payload: &Value,
5104) -> Result<Value, YamlWorkflowRunError> {
5105 if handler == "get_employee_record" {
5106 let employee_name = payload
5107 .get("employee_name")
5108 .and_then(Value::as_str)
5109 .unwrap_or("Unknown Employee")
5110 .trim();
5111 let normalized_name = if employee_name.is_empty() {
5112 "Unknown Employee"
5113 } else {
5114 employee_name
5115 };
5116
5117 let (employee_id, location) = match normalized_name.to_ascii_lowercase().as_str() {
5118 "alex johnson" => ("EMP-2041", "Austin"),
5119 "priya sharma" => ("EMP-3378", "Bengaluru"),
5120 "marcus lee" => ("EMP-1196", "Singapore"),
5121 "sarah chen" => ("EMP-4450", "Toronto"),
5122 _ => ("EMP-0000", "Unassigned"),
5123 };
5124
5125 return Ok(json!({
5126 "employee_name": normalized_name,
5127 "employee_id": employee_id,
5128 "location": location,
5129 }));
5130 }
5131
5132 if let Some(topic) = payload.get("topic").and_then(Value::as_str) {
5133 let mut value = mock_rag(topic);
5134 if let Value::Object(object) = &mut value {
5135 object.insert("handler".to_string(), Value::String(handler.to_string()));
5136 }
5137 return Ok(value);
5138 }
5139
5140 Err(YamlWorkflowRunError::UnsupportedCustomHandler {
5141 handler: handler.to_string(),
5142 })
5143}
5144
5145#[derive(Debug, Clone, Deserialize)]
5146pub struct YamlWorkflow {
5147 pub id: String,
5148 pub entry_node: String,
5149 #[serde(default)]
5150 pub nodes: Vec<YamlNode>,
5151 #[serde(default)]
5152 pub edges: Vec<YamlEdge>,
5153}
5154
5155#[derive(Debug, Clone, Deserialize)]
5156pub struct YamlNode {
5157 pub id: String,
5158 pub node_type: YamlNodeType,
5159 pub config: Option<YamlNodeConfig>,
5160}
5161
5162impl YamlNode {
5163 fn kind_name(&self) -> &'static str {
5164 if self.node_type.llm_call.is_some() {
5165 "llm_call"
5166 } else if self.node_type.switch.is_some() {
5167 "switch"
5168 } else if self.node_type.custom_worker.is_some() {
5169 "custom_worker"
5170 } else {
5171 "unknown"
5172 }
5173 }
5174}
5175
5176#[derive(Debug, Clone, Deserialize)]
5177pub struct YamlNodeType {
5178 pub llm_call: Option<YamlLlmCall>,
5179 pub switch: Option<YamlSwitch>,
5180 pub custom_worker: Option<YamlCustomWorker>,
5181}
5182
5183#[derive(Debug, Clone, Deserialize)]
5184pub struct YamlLlmCall {
5185 pub model: String,
5186 pub stream: Option<bool>,
5187 pub stream_json_as_text: Option<bool>,
5188 pub heal: Option<bool>,
5189 pub messages_path: Option<String>,
5190 pub append_prompt_as_user: Option<bool>,
5191 #[serde(default)]
5192 pub tools_format: YamlToolFormat,
5193 #[serde(default)]
5194 pub tools: Vec<YamlToolDeclaration>,
5195 pub tool_choice: Option<YamlToolChoiceConfig>,
5196 pub max_tool_roundtrips: Option<u8>,
5197 pub tool_calls_global_key: Option<String>,
5198}
5199
5200#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
5201#[serde(rename_all = "snake_case")]
5202pub enum YamlToolFormat {
5203 #[default]
5204 Openai,
5205 Simplified,
5206}
5207
5208#[derive(Debug, Clone, Deserialize)]
5209#[serde(untagged)]
5210pub enum YamlToolDeclaration {
5211 OpenAi(YamlOpenAiToolDeclaration),
5212 Simplified(YamlSimplifiedToolDeclaration),
5213}
5214
5215#[derive(Debug, Clone, Deserialize)]
5216pub struct YamlOpenAiToolDeclaration {
5217 #[serde(rename = "type")]
5218 pub tool_type: Option<ToolType>,
5219 pub function: YamlOpenAiToolFunction,
5220}
5221
5222#[derive(Debug, Clone, Deserialize)]
5223pub struct YamlOpenAiToolFunction {
5224 pub name: String,
5225 pub description: Option<String>,
5226 pub parameters: Option<Value>,
5227 pub output_schema: Option<Value>,
5228}
5229
5230#[derive(Debug, Clone, Deserialize)]
5231pub struct YamlSimplifiedToolDeclaration {
5232 pub name: String,
5233 pub description: Option<String>,
5234 pub input_schema: Value,
5235 pub output_schema: Option<Value>,
5236}
5237
5238#[derive(Debug, Clone, Deserialize)]
5239#[serde(untagged)]
5240pub enum YamlToolChoiceConfig {
5241 Mode(ToolChoiceMode),
5242 Function { function: String },
5243 OpenAi(ToolChoiceTool),
5244}
5245
5246#[derive(Debug, Clone, Deserialize)]
5247pub struct YamlSwitch {
5248 #[serde(default)]
5249 pub branches: Vec<YamlSwitchBranch>,
5250 pub default: String,
5251}
5252
5253#[derive(Debug, Clone, Deserialize)]
5254pub struct YamlSwitchBranch {
5255 pub condition: String,
5256 pub target: String,
5257}
5258
5259#[derive(Debug, Clone, Deserialize)]
5260pub struct YamlCustomWorker {
5261 pub handler: String,
5262}
5263
5264#[derive(Debug, Clone, Deserialize)]
5265pub struct YamlNodeConfig {
5266 pub prompt: Option<String>,
5267 #[serde(default, alias = "schema")]
5268 pub output_schema: Option<Value>,
5269 pub payload: Option<Value>,
5270 pub set_globals: Option<HashMap<String, String>>,
5271 pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
5272}
5273
5274#[derive(Debug, Clone, Deserialize)]
5275pub struct YamlGlobalUpdate {
5276 pub op: String,
5277 pub from: Option<String>,
5278 pub by: Option<f64>,
5279}
5280
5281#[derive(Debug, Clone, Deserialize)]
5282pub struct YamlEdge {
5283 pub from: String,
5284 pub to: String,
5285}
5286
5287#[cfg(test)]
5288mod tests {
5289 use super::*;
5290 use simple_agent_type::provider::{Provider, ProviderRequest, ProviderResponse};
5291 use simple_agent_type::response::{CompletionChoice, CompletionResponse, Usage};
5292 use simple_agent_type::tool::{ToolCallFunction, ToolType};
5293 use simple_agent_type::{Result as SaResult, SimpleAgentsError};
5294 use simple_agents_core::SimpleAgentsClientBuilder;
5295 use std::collections::BTreeMap;
5296 use std::fs;
5297 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5298 use std::sync::{Arc, Mutex, OnceLock};
5299
5300 fn stream_debug_env_lock() -> &'static Mutex<()> {
5301 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
5302 LOCK.get_or_init(|| Mutex::new(()))
5303 }
5304
5305 struct MockExecutor;
5306
5307 struct RecordingSink {
5308 events: Mutex<Vec<YamlWorkflowEvent>>,
5309 }
5310
5311 struct CancelAfterFirstEventSink {
5312 cancelled: AtomicBool,
5313 }
5314
5315 impl YamlWorkflowEventSink for CancelAfterFirstEventSink {
5316 fn emit(&self, _event: &YamlWorkflowEvent) {
5317 self.cancelled.store(true, Ordering::SeqCst);
5318 }
5319
5320 fn is_cancelled(&self) -> bool {
5321 self.cancelled.load(Ordering::SeqCst)
5322 }
5323 }
5324
5325 struct CountingExecutor {
5326 call_count: AtomicUsize,
5327 }
5328
5329 struct CapturingWorker {
5330 context: Mutex<Option<Value>>,
5331 }
5332
5333 struct ToolLoopProvider;
5334
5335 struct UnknownToolProvider;
5336
5337 struct ReasoningUsageProvider;
5338
5339 struct ToolLoopReasoningProvider;
5340
5341 #[derive(Default)]
5342 struct CapturingSpan {
5343 attributes: BTreeMap<String, String>,
5344 }
5345
5346 impl WorkflowSpan for CapturingSpan {
5347 fn set_attribute(&mut self, key: &str, value: &str) {
5348 self.attributes.insert(key.to_string(), value.to_string());
5349 }
5350
5351 fn add_event(&mut self, _name: &str) {}
5352
5353 fn end(self: Box<Self>) {}
5354 }
5355
5356 #[async_trait]
5357 impl Provider for ToolLoopProvider {
5358 fn name(&self) -> &str {
5359 "openai"
5360 }
5361
5362 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5363 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5364 Ok(ProviderRequest::new("mock://tool-loop").with_body(body))
5365 }
5366
5367 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5368 let request: CompletionRequest =
5369 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5370
5371 let has_tools = request
5372 .tools
5373 .as_ref()
5374 .is_some_and(|tools| !tools.is_empty());
5375 let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
5376
5377 let response = if has_tools && !has_tool_result {
5378 CompletionResponse {
5379 id: "resp_tool_1".to_string(),
5380 model: request.model.clone(),
5381 choices: vec![CompletionChoice {
5382 index: 0,
5383 message: Message::assistant("").with_tool_calls(vec![ToolCall {
5384 id: "call_get_context".to_string(),
5385 tool_type: ToolType::Function,
5386 function: ToolCallFunction {
5387 name: "get_customer_context".to_string(),
5388 arguments: "{\"order_id\":\"123\"}".to_string(),
5389 },
5390 }]),
5391 finish_reason: FinishReason::ToolCalls,
5392 logprobs: None,
5393 }],
5394 usage: Usage::new(10, 5),
5395 created: None,
5396 provider: Some(self.name().to_string()),
5397 healing_metadata: None,
5398 }
5399 } else if has_tools && has_tool_result {
5400 CompletionResponse {
5401 id: "resp_tool_2".to_string(),
5402 model: request.model.clone(),
5403 choices: vec![CompletionChoice {
5404 index: 0,
5405 message: Message::assistant("{\"state\":\"done\"}"),
5406 finish_reason: FinishReason::Stop,
5407 logprobs: None,
5408 }],
5409 usage: Usage::new(12, 6),
5410 created: None,
5411 provider: Some(self.name().to_string()),
5412 healing_metadata: None,
5413 }
5414 } else {
5415 let prompt = request
5416 .messages
5417 .iter()
5418 .rev()
5419 .find(|m| m.role == Role::User)
5420 .map(|m| m.content.clone())
5421 .unwrap_or_default();
5422 let payload = json!({
5423 "subject": "ok",
5424 "body": prompt,
5425 })
5426 .to_string();
5427 CompletionResponse {
5428 id: "resp_final".to_string(),
5429 model: request.model.clone(),
5430 choices: vec![CompletionChoice {
5431 index: 0,
5432 message: Message::assistant(payload),
5433 finish_reason: FinishReason::Stop,
5434 logprobs: None,
5435 }],
5436 usage: Usage::new(8, 4),
5437 created: None,
5438 provider: Some(self.name().to_string()),
5439 healing_metadata: None,
5440 }
5441 };
5442
5443 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5444 Ok(ProviderResponse::new(200, body))
5445 }
5446
5447 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5448 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5449 }
5450 }
5451
5452 #[async_trait]
5453 impl Provider for UnknownToolProvider {
5454 fn name(&self) -> &str {
5455 "openai"
5456 }
5457
5458 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5459 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5460 Ok(ProviderRequest::new("mock://unknown-tool").with_body(body))
5461 }
5462
5463 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5464 let request: CompletionRequest =
5465 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5466
5467 let response = CompletionResponse {
5468 id: "resp_unknown_tool".to_string(),
5469 model: request.model,
5470 choices: vec![CompletionChoice {
5471 index: 0,
5472 message: Message::assistant("").with_tool_calls(vec![ToolCall {
5473 id: "call_unknown".to_string(),
5474 tool_type: ToolType::Function,
5475 function: ToolCallFunction {
5476 name: "unknown_tool".to_string(),
5477 arguments: "{}".to_string(),
5478 },
5479 }]),
5480 finish_reason: FinishReason::ToolCalls,
5481 logprobs: None,
5482 }],
5483 usage: Usage::new(5, 2),
5484 created: None,
5485 provider: Some(self.name().to_string()),
5486 healing_metadata: None,
5487 };
5488
5489 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5490 Ok(ProviderResponse::new(200, body))
5491 }
5492
5493 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5494 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5495 }
5496 }
5497
5498 #[async_trait]
5499 impl Provider for ReasoningUsageProvider {
5500 fn name(&self) -> &str {
5501 "openai"
5502 }
5503
5504 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5505 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5506 Ok(ProviderRequest::new("mock://reasoning-usage").with_body(body))
5507 }
5508
5509 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5510 let request: CompletionRequest =
5511 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5512
5513 let mut usage = Usage::new(9, 5);
5514 usage.reasoning_tokens = Some(4);
5515 let response = CompletionResponse {
5516 id: "resp_reasoning".to_string(),
5517 model: request.model,
5518 choices: vec![CompletionChoice {
5519 index: 0,
5520 message: Message::assistant("{\"state\":\"ok\"}"),
5521 finish_reason: FinishReason::Stop,
5522 logprobs: None,
5523 }],
5524 usage,
5525 created: None,
5526 provider: Some(self.name().to_string()),
5527 healing_metadata: None,
5528 };
5529
5530 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5531 Ok(ProviderResponse::new(200, body))
5532 }
5533
5534 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5535 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5536 }
5537 }
5538
5539 #[async_trait]
5540 impl Provider for ToolLoopReasoningProvider {
5541 fn name(&self) -> &str {
5542 "openai"
5543 }
5544
5545 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5546 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5547 Ok(ProviderRequest::new("mock://tool-loop-reasoning").with_body(body))
5548 }
5549
5550 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5551 let request: CompletionRequest =
5552 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5553
5554 let has_tools = request
5555 .tools
5556 .as_ref()
5557 .is_some_and(|tools| !tools.is_empty());
5558 let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
5559
5560 let response = if has_tools && !has_tool_result {
5561 let mut usage = Usage::new(10, 5);
5562 usage.reasoning_tokens = Some(2);
5563 CompletionResponse {
5564 id: "resp_tool_reasoning_1".to_string(),
5565 model: request.model.clone(),
5566 choices: vec![CompletionChoice {
5567 index: 0,
5568 message: Message::assistant("").with_tool_calls(vec![ToolCall {
5569 id: "call_get_context".to_string(),
5570 tool_type: ToolType::Function,
5571 function: ToolCallFunction {
5572 name: "get_customer_context".to_string(),
5573 arguments: "{\"order_id\":\"123\"}".to_string(),
5574 },
5575 }]),
5576 finish_reason: FinishReason::ToolCalls,
5577 logprobs: None,
5578 }],
5579 usage,
5580 created: None,
5581 provider: Some(self.name().to_string()),
5582 healing_metadata: None,
5583 }
5584 } else {
5585 let mut usage = Usage::new(12, 6);
5586 usage.reasoning_tokens = Some(3);
5587 CompletionResponse {
5588 id: "resp_tool_reasoning_2".to_string(),
5589 model: request.model,
5590 choices: vec![CompletionChoice {
5591 index: 0,
5592 message: Message::assistant("{\"state\":\"done\"}"),
5593 finish_reason: FinishReason::Stop,
5594 logprobs: None,
5595 }],
5596 usage,
5597 created: None,
5598 provider: Some(self.name().to_string()),
5599 healing_metadata: None,
5600 }
5601 };
5602
5603 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5604 Ok(ProviderResponse::new(200, body))
5605 }
5606
5607 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5608 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5609 }
5610 }
5611
5612 struct FixedToolWorker {
5613 payload: Value,
5614 }
5615
5616 struct CountingToolWorker {
5617 execute_calls: AtomicUsize,
5618 }
5619
5620 #[async_trait]
5621 impl YamlWorkflowCustomWorkerExecutor for FixedToolWorker {
5622 async fn execute(
5623 &self,
5624 _handler: &str,
5625 _payload: &Value,
5626 _email_text: &str,
5627 _context: &Value,
5628 ) -> Result<Value, String> {
5629 Ok(self.payload.clone())
5630 }
5631 }
5632
5633 #[async_trait]
5634 impl YamlWorkflowCustomWorkerExecutor for CountingToolWorker {
5635 async fn execute(
5636 &self,
5637 _handler: &str,
5638 _payload: &Value,
5639 _email_text: &str,
5640 _context: &Value,
5641 ) -> Result<Value, String> {
5642 self.execute_calls.fetch_add(1, Ordering::SeqCst);
5643 Ok(json!({"ok": true}))
5644 }
5645 }
5646
5647 #[async_trait]
5648 impl YamlWorkflowLlmExecutor for CountingExecutor {
5649 async fn complete_structured(
5650 &self,
5651 _request: YamlLlmExecutionRequest,
5652 _event_sink: Option<&dyn YamlWorkflowEventSink>,
5653 ) -> Result<YamlLlmExecutionResult, String> {
5654 self.call_count.fetch_add(1, Ordering::SeqCst);
5655 Ok(YamlLlmExecutionResult {
5656 payload: json!({"state":"ok"}),
5657 usage: None,
5658 ttft_ms: None,
5659 tool_calls: Vec::new(),
5660 })
5661 }
5662 }
5663
5664 impl YamlWorkflowEventSink for RecordingSink {
5665 fn emit(&self, event: &YamlWorkflowEvent) {
5666 self.events
5667 .lock()
5668 .expect("recording sink lock should not be poisoned")
5669 .push(event.clone());
5670 }
5671 }
5672
5673 #[async_trait]
5674 impl YamlWorkflowCustomWorkerExecutor for CapturingWorker {
5675 async fn execute(
5676 &self,
5677 _handler: &str,
5678 _payload: &Value,
5679 _email_text: &str,
5680 context: &Value,
5681 ) -> Result<Value, String> {
5682 let mut guard = self
5683 .context
5684 .lock()
5685 .map_err(|_| "capturing worker lock should not be poisoned".to_string())?;
5686 *guard = Some(context.clone());
5687 Ok(json!({"ok": true}))
5688 }
5689 }
5690
5691 #[async_trait]
5692 impl YamlWorkflowLlmExecutor for MockExecutor {
5693 async fn complete_structured(
5694 &self,
5695 request: YamlLlmExecutionRequest,
5696 _event_sink: Option<&dyn YamlWorkflowEventSink>,
5697 ) -> Result<YamlLlmExecutionResult, String> {
5698 let prompt = request.prompt;
5699 if prompt.contains("exactly one category") {
5700 return Ok(YamlLlmExecutionResult {
5701 payload: json!({"category":"termination","reason":"mock"}),
5702 usage: Some(YamlLlmTokenUsage {
5703 prompt_tokens: 10,
5704 completion_tokens: 5,
5705 total_tokens: 15,
5706 reasoning_tokens: None,
5707 }),
5708 ttft_ms: None,
5709 tool_calls: Vec::new(),
5710 });
5711 }
5712 if prompt.contains("Determine termination subtype") {
5713 return Ok(YamlLlmExecutionResult {
5714 payload: json!({"subtype":"repeated_offense","reason":"mock"}),
5715 usage: Some(YamlLlmTokenUsage {
5716 prompt_tokens: 12,
5717 completion_tokens: 6,
5718 total_tokens: 18,
5719 reasoning_tokens: None,
5720 }),
5721 ttft_ms: None,
5722 tool_calls: Vec::new(),
5723 });
5724 }
5725 if prompt.contains("Determine supply chain subtype") {
5726 return Ok(YamlLlmExecutionResult {
5727 payload: json!({"subtype":"order_replacement","reason":"mock"}),
5728 usage: Some(YamlLlmTokenUsage {
5729 prompt_tokens: 11,
5730 completion_tokens: 4,
5731 total_tokens: 15,
5732 reasoning_tokens: None,
5733 }),
5734 ttft_ms: None,
5735 tool_calls: Vec::new(),
5736 });
5737 }
5738 Err("unexpected prompt".to_string())
5739 }
5740 }
5741
5742 #[tokio::test]
5743 async fn runs_yaml_workflow_and_returns_step_timings() {
5744 let yaml = r#"
5745id: email-intake-classification
5746entry_node: classify_top_level
5747nodes:
5748 - id: classify_top_level
5749 node_type:
5750 llm_call:
5751 model: gpt-4.1
5752 config:
5753 prompt: |
5754 Classify this email into exactly one category:
5755 {{ input.email_text }}
5756 - id: route_top_level
5757 node_type:
5758 switch:
5759 branches:
5760 - condition: '$.nodes.classify_top_level.output.category == "termination"'
5761 target: classify_termination_subtype
5762 default: rag_clarification
5763 - id: classify_termination_subtype
5764 node_type:
5765 llm_call:
5766 model: gpt-4.1
5767 config:
5768 prompt: |
5769 Determine termination subtype:
5770 {{ input.email_text }}
5771 - id: route_termination_subtype
5772 node_type:
5773 switch:
5774 branches:
5775 - condition: '$.nodes.classify_termination_subtype.output.subtype == "repeated_offense"'
5776 target: rag_termination_repeated_offense
5777 default: rag_clarification
5778 - id: rag_termination_repeated_offense
5779 node_type:
5780 custom_worker:
5781 handler: GetRagData
5782 config:
5783 payload:
5784 topic: termination_repeated_offense
5785 - id: rag_clarification
5786 node_type:
5787 custom_worker:
5788 handler: GetRagData
5789 config:
5790 payload:
5791 topic: clarification
5792edges:
5793 - from: classify_top_level
5794 to: route_top_level
5795 - from: classify_termination_subtype
5796 to: route_termination_subtype
5797"#;
5798
5799 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5800 let output = run_email_workflow_yaml(&workflow, "test", &MockExecutor)
5801 .await
5802 .expect("yaml workflow should execute");
5803
5804 assert_eq!(output.workflow_id, "email-intake-classification");
5805 assert_eq!(output.terminal_node, "rag_termination_repeated_offense");
5806 assert!(!output.step_timings.is_empty());
5807 assert_eq!(output.step_timings.len(), output.trace.len());
5808 assert!(output
5809 .outputs
5810 .contains_key("rag_termination_repeated_offense"));
5811 assert_eq!(output.total_input_tokens, 22);
5812 assert_eq!(output.total_output_tokens, 11);
5813 assert_eq!(output.total_tokens, 33);
5814 }
5815
5816 #[tokio::test]
5817 async fn emits_resolved_llm_input_event_with_bindings() {
5818 let yaml = r#"
5819id: email-intake-classification
5820entry_node: classify_top_level
5821nodes:
5822 - id: classify_top_level
5823 node_type:
5824 llm_call:
5825 model: gpt-4.1
5826 config:
5827 prompt: |
5828 Classify this email into exactly one category:
5829 {{ input.email_text }}
5830"#;
5831
5832 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5833 let sink = RecordingSink {
5834 events: Mutex::new(Vec::new()),
5835 };
5836
5837 let output = run_email_workflow_yaml_with_custom_worker_and_events(
5838 &workflow,
5839 "Need help with termination",
5840 &MockExecutor,
5841 None,
5842 Some(&sink),
5843 )
5844 .await
5845 .expect("yaml workflow should execute");
5846
5847 assert_eq!(output.terminal_node, "classify_top_level");
5848
5849 let events = sink
5850 .events
5851 .lock()
5852 .expect("recording sink lock should not be poisoned");
5853 let llm_event = events
5854 .iter()
5855 .find(|event| event.event_type == "node_llm_input_resolved")
5856 .expect("expected llm input telemetry event");
5857
5858 let metadata = llm_event
5859 .metadata
5860 .as_ref()
5861 .expect("llm input event must include metadata");
5862 assert_eq!(metadata["model"], Value::String("gpt-4.1".to_string()));
5863 assert_eq!(metadata["stream_requested"], Value::Bool(false));
5864 assert_eq!(metadata["heal_requested"], Value::Bool(false));
5865 assert!(metadata["prompt"]
5866 .as_str()
5867 .expect("prompt should be a string")
5868 .contains("Need help with termination"));
5869
5870 let bindings = metadata["bindings"]
5871 .as_array()
5872 .expect("bindings should be an array");
5873 assert_eq!(bindings.len(), 1);
5874 assert_eq!(
5875 bindings[0]["source_path"],
5876 Value::String("input.email_text".to_string())
5877 );
5878 assert_eq!(
5879 bindings[0]["resolved"],
5880 Value::String("Need help with termination".to_string())
5881 );
5882 assert_eq!(bindings[0]["missing"], Value::Bool(false));
5883 assert_eq!(
5884 bindings[0]["resolved_type"],
5885 Value::String("string".to_string())
5886 );
5887 }
5888
5889 #[tokio::test]
5890 async fn workflow_completed_event_includes_nerdstats_by_default() {
5891 let yaml = r#"
5892id: nerdstats-default
5893entry_node: classify
5894nodes:
5895 - id: classify
5896 node_type:
5897 llm_call:
5898 model: gpt-4.1
5899 config:
5900 prompt: |
5901 Classify this email into exactly one category:
5902 {{ input.email_text }}
5903"#;
5904
5905 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5906 let sink = RecordingSink {
5907 events: Mutex::new(Vec::new()),
5908 };
5909
5910 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5911 &workflow,
5912 &json!({"email_text":"hello"}),
5913 &MockExecutor,
5914 None,
5915 Some(&sink),
5916 &YamlWorkflowRunOptions::default(),
5917 )
5918 .await
5919 .expect("workflow should execute");
5920
5921 let events = sink
5922 .events
5923 .lock()
5924 .expect("recording sink lock should not be poisoned");
5925 let completed = events
5926 .iter()
5927 .find(|event| event.event_type == "workflow_completed")
5928 .expect("expected workflow_completed event");
5929 let metadata = completed
5930 .metadata
5931 .as_ref()
5932 .expect("workflow_completed should include metadata by default");
5933 let nerdstats = metadata
5934 .get("nerdstats")
5935 .expect("nerdstats should be present by default");
5936
5937 assert_eq!(nerdstats["workflow_id"], Value::String(output.workflow_id));
5938 assert_eq!(
5939 nerdstats["terminal_node"],
5940 Value::String(output.terminal_node)
5941 );
5942 assert_eq!(
5943 nerdstats["total_tokens"],
5944 Value::Number(output.total_tokens.into())
5945 );
5946 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
5947 assert_eq!(
5948 nerdstats["token_metrics_source"],
5949 Value::String("provider_usage".to_string())
5950 );
5951 assert!(nerdstats.get("step_timings").is_none());
5952 assert!(nerdstats.get("llm_node_metrics").is_none());
5953 assert!(nerdstats.get("step_details").is_some());
5954 assert!(nerdstats.get("llm_node_models").is_none());
5955 assert_eq!(
5956 nerdstats["step_details"][0]["model_name"],
5957 Value::String("gpt-4.1".to_string())
5958 );
5959 assert_eq!(
5960 nerdstats["step_details"][0]["node_id"],
5961 Value::String("classify".to_string())
5962 );
5963 assert_eq!(nerdstats["ttft_ms"], Value::Null);
5964 }
5965
5966 #[tokio::test]
5967 async fn workflow_completed_event_omits_nerdstats_when_disabled() {
5968 let yaml = r#"
5969id: nerdstats-disabled
5970entry_node: classify
5971nodes:
5972 - id: classify
5973 node_type:
5974 llm_call:
5975 model: gpt-4.1
5976 config:
5977 prompt: |
5978 Classify this email into exactly one category:
5979 {{ input.email_text }}
5980"#;
5981
5982 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5983 let sink = RecordingSink {
5984 events: Mutex::new(Vec::new()),
5985 };
5986 let options = YamlWorkflowRunOptions {
5987 telemetry: YamlWorkflowTelemetryConfig {
5988 nerdstats: false,
5989 ..YamlWorkflowTelemetryConfig::default()
5990 },
5991 ..YamlWorkflowRunOptions::default()
5992 };
5993
5994 run_workflow_yaml_with_custom_worker_and_events_and_options(
5995 &workflow,
5996 &json!({"email_text":"hello"}),
5997 &MockExecutor,
5998 None,
5999 Some(&sink),
6000 &options,
6001 )
6002 .await
6003 .expect("workflow should execute");
6004
6005 let events = sink
6006 .events
6007 .lock()
6008 .expect("recording sink lock should not be poisoned");
6009 let completed = events
6010 .iter()
6011 .find(|event| event.event_type == "workflow_completed")
6012 .expect("expected workflow_completed event");
6013 assert!(completed.metadata.is_none());
6014 }
6015
6016 struct StreamAwareMockExecutor;
6017
6018 #[async_trait]
6019 impl YamlWorkflowLlmExecutor for StreamAwareMockExecutor {
6020 async fn complete_structured(
6021 &self,
6022 request: YamlLlmExecutionRequest,
6023 _event_sink: Option<&dyn YamlWorkflowEventSink>,
6024 ) -> Result<YamlLlmExecutionResult, String> {
6025 Ok(YamlLlmExecutionResult {
6026 payload: json!({"state":"ok"}),
6027 usage: Some(YamlLlmTokenUsage {
6028 prompt_tokens: 20,
6029 completion_tokens: 10,
6030 total_tokens: 30,
6031 reasoning_tokens: None,
6032 }),
6033 ttft_ms: if request.stream { Some(12) } else { None },
6034 tool_calls: Vec::new(),
6035 })
6036 }
6037 }
6038
6039 #[tokio::test]
6040 async fn workflow_completed_event_includes_nerdstats_for_streaming_nodes() {
6041 let yaml = r#"
6042id: nerdstats-streaming
6043entry_node: classify
6044nodes:
6045 - id: classify
6046 node_type:
6047 llm_call:
6048 model: gpt-4.1
6049 stream: true
6050 config:
6051 prompt: |
6052 Return JSON only:
6053 {"state":"ok"}
6054"#;
6055
6056 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6057 let sink = RecordingSink {
6058 events: Mutex::new(Vec::new()),
6059 };
6060
6061 run_workflow_yaml_with_custom_worker_and_events_and_options(
6062 &workflow,
6063 &json!({"email_text":"hello"}),
6064 &StreamAwareMockExecutor,
6065 None,
6066 Some(&sink),
6067 &YamlWorkflowRunOptions::default(),
6068 )
6069 .await
6070 .expect("workflow should execute");
6071
6072 let events = sink
6073 .events
6074 .lock()
6075 .expect("recording sink lock should not be poisoned");
6076 let completed = events
6077 .iter()
6078 .find(|event| event.event_type == "workflow_completed")
6079 .expect("expected workflow_completed event");
6080 let metadata = completed
6081 .metadata
6082 .as_ref()
6083 .expect("workflow_completed should include metadata by default");
6084 let nerdstats = metadata
6085 .get("nerdstats")
6086 .expect("nerdstats should be present by default");
6087
6088 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
6089 assert_eq!(nerdstats["total_tokens"], Value::Number(30u64.into()));
6090 assert_eq!(nerdstats["ttft_ms"], Value::Number(12u64.into()));
6091 assert!(nerdstats.get("step_timings").is_none());
6092 assert!(nerdstats.get("llm_node_metrics").is_none());
6093 assert_eq!(
6094 nerdstats["step_details"][0]["model_name"],
6095 Value::String("gpt-4.1".to_string())
6096 );
6097 assert_eq!(
6098 nerdstats["step_details"][0]["node_id"],
6099 Value::String("classify".to_string())
6100 );
6101 }
6102
6103 #[test]
6104 fn workflow_nerdstats_marks_stream_token_metrics_unavailable() {
6105 let output = YamlWorkflowRunOutput {
6106 workflow_id: "workflow".to_string(),
6107 entry_node: "start".to_string(),
6108 email_text: "hello".to_string(),
6109 trace: vec!["llm_node".to_string()],
6110 outputs: BTreeMap::new(),
6111 terminal_node: "llm_node".to_string(),
6112 terminal_output: None,
6113 step_timings: vec![YamlStepTiming {
6114 node_id: "llm_node".to_string(),
6115 node_kind: "llm_call".to_string(),
6116 model_name: Some("gpt-4.1".to_string()),
6117 elapsed_ms: 100,
6118 prompt_tokens: None,
6119 completion_tokens: None,
6120 total_tokens: None,
6121 reasoning_tokens: None,
6122 tokens_per_second: None,
6123 }],
6124 llm_node_metrics: BTreeMap::new(),
6125 llm_node_models: BTreeMap::new(),
6126 total_elapsed_ms: 100,
6127 ttft_ms: None,
6128 total_input_tokens: 0,
6129 total_output_tokens: 0,
6130 total_tokens: 0,
6131 total_reasoning_tokens: None,
6132 tokens_per_second: 0.0,
6133 trace_id: Some("trace-1".to_string()),
6134 metadata: None,
6135 };
6136
6137 let nerdstats = workflow_nerdstats(&output);
6138 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(false));
6139 assert_eq!(
6140 nerdstats["token_metrics_source"],
6141 Value::String("provider_stream_usage_unavailable".to_string())
6142 );
6143 assert_eq!(nerdstats["total_tokens"], Value::Null);
6144 assert_eq!(nerdstats["ttft_ms"], Value::Null);
6145 assert!(nerdstats.get("step_timings").is_none());
6146 assert!(nerdstats.get("llm_node_metrics").is_none());
6147 assert_eq!(
6148 nerdstats["step_details"][0]["node_id"],
6149 Value::String("llm_node".to_string())
6150 );
6151 assert_eq!(
6152 nerdstats["step_details"][0]["model_name"],
6153 Value::String("gpt-4.1".to_string())
6154 );
6155 }
6156
6157 #[test]
6158 fn workflow_nerdstats_includes_ttft_when_available() {
6159 let output = YamlWorkflowRunOutput {
6160 workflow_id: "workflow".to_string(),
6161 entry_node: "start".to_string(),
6162 email_text: "hello".to_string(),
6163 trace: vec!["llm_node".to_string()],
6164 outputs: BTreeMap::new(),
6165 terminal_node: "llm_node".to_string(),
6166 terminal_output: None,
6167 step_timings: vec![YamlStepTiming {
6168 node_id: "llm_node".to_string(),
6169 node_kind: "llm_call".to_string(),
6170 model_name: Some("gpt-4.1".to_string()),
6171 elapsed_ms: 100,
6172 prompt_tokens: Some(10),
6173 completion_tokens: Some(15),
6174 total_tokens: Some(25),
6175 reasoning_tokens: None,
6176 tokens_per_second: Some(150.0),
6177 }],
6178 llm_node_metrics: BTreeMap::new(),
6179 llm_node_models: BTreeMap::new(),
6180 total_elapsed_ms: 100,
6181 ttft_ms: Some(42),
6182 total_input_tokens: 10,
6183 total_output_tokens: 15,
6184 total_tokens: 25,
6185 total_reasoning_tokens: None,
6186 tokens_per_second: 150.0,
6187 trace_id: Some("trace-2".to_string()),
6188 metadata: None,
6189 };
6190
6191 let nerdstats = workflow_nerdstats(&output);
6192 assert_eq!(nerdstats["ttft_ms"], Value::Number(42u64.into()));
6193 assert!(nerdstats.get("step_timings").is_none());
6194 assert!(nerdstats.get("llm_node_metrics").is_none());
6195 assert_eq!(
6196 nerdstats["step_details"][0]["node_id"],
6197 Value::String("llm_node".to_string())
6198 );
6199 assert_eq!(
6200 nerdstats["step_details"][0]["model_name"],
6201 Value::String("gpt-4.1".to_string())
6202 );
6203 }
6204
6205 #[test]
6206 fn workflow_nerdstats_schema_contract_is_stable() {
6207 let output = YamlWorkflowRunOutput {
6208 workflow_id: "schema-workflow".to_string(),
6209 entry_node: "start".to_string(),
6210 email_text: "hello".to_string(),
6211 trace: vec!["classify".to_string(), "route".to_string()],
6212 outputs: BTreeMap::new(),
6213 terminal_node: "route".to_string(),
6214 terminal_output: None,
6215 step_timings: vec![
6216 YamlStepTiming {
6217 node_id: "classify".to_string(),
6218 node_kind: "llm_call".to_string(),
6219 model_name: Some("gpt-4.1".to_string()),
6220 elapsed_ms: 100,
6221 prompt_tokens: Some(11),
6222 completion_tokens: Some(22),
6223 total_tokens: Some(33),
6224 reasoning_tokens: Some(7),
6225 tokens_per_second: Some(220.0),
6226 },
6227 YamlStepTiming {
6228 node_id: "route".to_string(),
6229 node_kind: "switch".to_string(),
6230 model_name: None,
6231 elapsed_ms: 0,
6232 prompt_tokens: None,
6233 completion_tokens: None,
6234 total_tokens: None,
6235 reasoning_tokens: None,
6236 tokens_per_second: None,
6237 },
6238 ],
6239 llm_node_metrics: BTreeMap::new(),
6240 llm_node_models: BTreeMap::new(),
6241 total_elapsed_ms: 100,
6242 ttft_ms: Some(9),
6243 total_input_tokens: 11,
6244 total_output_tokens: 22,
6245 total_tokens: 33,
6246 total_reasoning_tokens: Some(7),
6247 tokens_per_second: 220.0,
6248 trace_id: Some("trace-schema".to_string()),
6249 metadata: None,
6250 };
6251
6252 let nerdstats = workflow_nerdstats(&output);
6253 let expected = json!({
6254 "workflow_id": "schema-workflow",
6255 "terminal_node": "route",
6256 "total_elapsed_ms": 100,
6257 "ttft_ms": 9,
6258 "step_details": [
6259 {
6260 "node_id": "classify",
6261 "node_kind": "llm_call",
6262 "model_name": "gpt-4.1",
6263 "elapsed_ms": 100,
6264 "prompt_tokens": 11,
6265 "completion_tokens": 22,
6266 "total_tokens": 33,
6267 "reasoning_tokens": 7,
6268 "tokens_per_second": 220.0
6269 },
6270 {
6271 "node_id": "route",
6272 "node_kind": "switch",
6273 "elapsed_ms": 0
6274 }
6275 ],
6276 "total_input_tokens": 11,
6277 "total_output_tokens": 22,
6278 "total_tokens": 33,
6279 "total_reasoning_tokens": 7,
6280 "tokens_per_second": 220.0,
6281 "trace_id": "trace-schema",
6282 "token_metrics_available": true,
6283 "token_metrics_source": "provider_usage",
6284 "llm_nodes_without_usage": []
6285 });
6286
6287 assert_eq!(nerdstats, expected);
6288 }
6289
6290 struct MessageHistoryExecutor;
6291
6292 #[async_trait]
6293 impl YamlWorkflowLlmExecutor for MessageHistoryExecutor {
6294 async fn complete_structured(
6295 &self,
6296 request: YamlLlmExecutionRequest,
6297 _event_sink: Option<&dyn YamlWorkflowEventSink>,
6298 ) -> Result<YamlLlmExecutionResult, String> {
6299 let messages = request
6300 .messages
6301 .ok_or_else(|| "expected messages in request".to_string())?;
6302 if messages.len() != 2 {
6303 return Err(format!("expected 2 messages, got {}", messages.len()));
6304 }
6305 Ok(YamlLlmExecutionResult {
6306 payload: json!({"category":"termination","reason":"history"}),
6307 usage: Some(YamlLlmTokenUsage {
6308 prompt_tokens: 7,
6309 completion_tokens: 3,
6310 total_tokens: 10,
6311 reasoning_tokens: None,
6312 }),
6313 ttft_ms: None,
6314 tool_calls: Vec::new(),
6315 })
6316 }
6317 }
6318
6319 #[tokio::test]
6320 async fn supports_messages_path_in_workflow_input() {
6321 let yaml = r#"
6322id: email-intake-classification
6323entry_node: classify_top_level
6324nodes:
6325 - id: classify_top_level
6326 node_type:
6327 llm_call:
6328 model: gpt-4.1
6329 messages_path: input.messages
6330 append_prompt_as_user: false
6331"#;
6332
6333 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6334 let input = json!({
6335 "email_text": "ignored",
6336 "messages": [
6337 {"role": "system", "content": "You are a classifier"},
6338 {"role": "user", "content": "Please classify this"}
6339 ]
6340 });
6341
6342 let output = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
6343 .await
6344 .expect("workflow should use chat history from input");
6345
6346 assert_eq!(output.terminal_node, "classify_top_level");
6347 assert_eq!(
6348 output.outputs["classify_top_level"]["output"]["reason"],
6349 Value::String("history".to_string())
6350 );
6351 }
6352
6353 #[tokio::test]
6354 async fn wrapper_entrypoints_produce_equivalent_outputs() {
6355 let yaml = r#"
6356id: wrapper-equivalence
6357entry_node: classify
6358nodes:
6359 - id: classify
6360 node_type:
6361 llm_call:
6362 model: gpt-4.1
6363 config:
6364 prompt: |
6365 Classify this email into exactly one category:
6366 {{ input.email_text }}
6367"#;
6368
6369 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6370 let input = json!({"email_text":"hello"});
6371
6372 let a = run_workflow_yaml(&workflow, &input, &MockExecutor)
6373 .await
6374 .expect("base entrypoint should execute");
6375 let b = run_workflow_yaml_with_custom_worker(&workflow, &input, &MockExecutor, None)
6376 .await
6377 .expect("custom worker wrapper should execute");
6378 let c = run_workflow_yaml_with_custom_worker_and_events_and_options(
6379 &workflow,
6380 &input,
6381 &MockExecutor,
6382 None,
6383 None,
6384 &YamlWorkflowRunOptions::default(),
6385 )
6386 .await
6387 .expect("events/options wrapper should execute");
6388
6389 assert_eq!(a.workflow_id, b.workflow_id);
6390 assert_eq!(a.workflow_id, c.workflow_id);
6391 assert_eq!(a.terminal_node, b.terminal_node);
6392 assert_eq!(a.terminal_node, c.terminal_node);
6393 assert_eq!(a.outputs, b.outputs);
6394 assert_eq!(a.outputs, c.outputs);
6395 assert_eq!(a.total_tokens, b.total_tokens);
6396 assert_eq!(a.total_tokens, c.total_tokens);
6397 }
6398
6399 #[tokio::test]
6400 async fn yaml_llm_tool_calling_captures_traces_and_supports_globals_reference() {
6401 let yaml = r#"
6402id: tool-calling-workflow
6403entry_node: generate_with_tool
6404nodes:
6405 - id: generate_with_tool
6406 node_type:
6407 llm_call:
6408 model: gpt-4.1
6409 tools_format: simplified
6410 max_tool_roundtrips: 1
6411 tool_calls_global_key: audit
6412 tools:
6413 - name: get_customer_context
6414 description: Fetch customer context
6415 input_schema:
6416 type: object
6417 properties:
6418 order_id: { type: string }
6419 required: [order_id]
6420 additionalProperties: false
6421 output_schema:
6422 type: object
6423 properties:
6424 customer_name: { type: string }
6425 required: [customer_name]
6426 additionalProperties: false
6427 config:
6428 output_schema:
6429 type: object
6430 properties:
6431 state: { type: string }
6432 required: [state]
6433 - id: personalize
6434 node_type:
6435 llm_call:
6436 model: gpt-4.1
6437 config:
6438 prompt: |
6439 Write an email greeting for {{ globals.audit.0.output.customer_name }}.
6440 output_schema:
6441 type: object
6442 properties:
6443 subject: { type: string }
6444 body: { type: string }
6445 required: [subject, body]
6446edges:
6447 - from: generate_with_tool
6448 to: personalize
6449"#;
6450
6451 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6452 let client = SimpleAgentsClientBuilder::new()
6453 .with_provider(Arc::new(ToolLoopProvider))
6454 .build()
6455 .expect("client should build");
6456 let worker = FixedToolWorker {
6457 payload: json!({"customer_name": "Ava"}),
6458 };
6459
6460 let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6461 &workflow,
6462 &json!({"email_text":"hello"}),
6463 &client,
6464 Some(&worker),
6465 None,
6466 &YamlWorkflowRunOptions::default(),
6467 )
6468 .await
6469 .expect("workflow should execute");
6470
6471 assert_eq!(output.trace, vec!["generate_with_tool", "personalize"]);
6472 assert_eq!(
6473 output.outputs["generate_with_tool"]["tool_calls"][0]["output"]["customer_name"],
6474 Value::String("Ava".to_string())
6475 );
6476 let body = output.outputs["personalize"]["output"]["body"]
6477 .as_str()
6478 .expect("body should be string");
6479 assert!(body.contains("Ava"));
6480 }
6481
6482 #[tokio::test]
6483 async fn workflow_with_client_preserves_reasoning_tokens_in_output_and_nerdstats() {
6484 let yaml = r#"
6485id: reasoning-usage-workflow
6486entry_node: classify
6487nodes:
6488 - id: classify
6489 node_type:
6490 llm_call:
6491 model: gpt-4.1
6492 config:
6493 prompt: |
6494 Return JSON only:
6495 {"state":"ok"}
6496 output_schema:
6497 type: object
6498 properties:
6499 state: { type: string }
6500 required: [state]
6501"#;
6502
6503 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6504 let client = SimpleAgentsClientBuilder::new()
6505 .with_provider(Arc::new(ReasoningUsageProvider))
6506 .build()
6507 .expect("client should build");
6508
6509 let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6510 &workflow,
6511 &json!({"email_text":"hello"}),
6512 &client,
6513 None,
6514 None,
6515 &YamlWorkflowRunOptions::default(),
6516 )
6517 .await
6518 .expect("workflow should execute");
6519
6520 assert_eq!(output.total_reasoning_tokens, Some(4));
6521 assert_eq!(output.step_timings.len(), 1);
6522 assert_eq!(output.step_timings[0].reasoning_tokens, Some(4));
6523
6524 let nerdstats = workflow_nerdstats(&output);
6525 assert_eq!(
6526 nerdstats["total_reasoning_tokens"],
6527 Value::Number(4u64.into())
6528 );
6529 assert_eq!(
6530 nerdstats["step_details"][0]["reasoning_tokens"],
6531 Value::Number(4u64.into())
6532 );
6533 }
6534
6535 #[tokio::test]
6536 async fn workflow_with_tools_accumulates_reasoning_tokens_across_roundtrips() {
6537 let yaml = r#"
6538id: tool-reasoning-workflow
6539entry_node: generate_with_tool
6540nodes:
6541 - id: generate_with_tool
6542 node_type:
6543 llm_call:
6544 model: gpt-4.1
6545 tools_format: simplified
6546 max_tool_roundtrips: 1
6547 tools:
6548 - name: get_customer_context
6549 input_schema:
6550 type: object
6551 properties:
6552 order_id: { type: string }
6553 required: [order_id]
6554 additionalProperties: false
6555 output_schema:
6556 type: object
6557 properties:
6558 customer_name: { type: string }
6559 required: [customer_name]
6560 additionalProperties: false
6561 config:
6562 output_schema:
6563 type: object
6564 properties:
6565 state: { type: string }
6566 required: [state]
6567"#;
6568
6569 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6570 let client = SimpleAgentsClientBuilder::new()
6571 .with_provider(Arc::new(ToolLoopReasoningProvider))
6572 .build()
6573 .expect("client should build");
6574 let worker = FixedToolWorker {
6575 payload: json!({"customer_name": "Ava"}),
6576 };
6577
6578 let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6579 &workflow,
6580 &json!({"email_text":"hello"}),
6581 &client,
6582 Some(&worker),
6583 None,
6584 &YamlWorkflowRunOptions::default(),
6585 )
6586 .await
6587 .expect("workflow should execute");
6588
6589 assert_eq!(output.total_reasoning_tokens, Some(5));
6590 assert_eq!(output.step_timings.len(), 1);
6591 assert_eq!(output.step_timings[0].reasoning_tokens, Some(5));
6592
6593 let nerdstats = workflow_nerdstats(&output);
6594 assert_eq!(
6595 nerdstats["total_reasoning_tokens"],
6596 Value::Number(5u64.into())
6597 );
6598 assert_eq!(
6599 nerdstats["step_details"][0]["reasoning_tokens"],
6600 Value::Number(5u64.into())
6601 );
6602 }
6603
6604 #[tokio::test]
6605 async fn yaml_llm_tool_output_schema_mismatch_hard_fails_node() {
6606 let yaml = r#"
6607id: tool-calling-schema-fail
6608entry_node: generate_with_tool
6609nodes:
6610 - id: generate_with_tool
6611 node_type:
6612 llm_call:
6613 model: gpt-4.1
6614 tools_format: simplified
6615 max_tool_roundtrips: 1
6616 tools:
6617 - name: get_customer_context
6618 input_schema:
6619 type: object
6620 properties:
6621 order_id: { type: string }
6622 required: [order_id]
6623 additionalProperties: false
6624 output_schema:
6625 type: object
6626 properties:
6627 customer_name: { type: string }
6628 required: [customer_name]
6629 additionalProperties: false
6630 config:
6631 output_schema:
6632 type: object
6633 properties:
6634 state: { type: string }
6635 required: [state]
6636"#;
6637
6638 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6639 let client = SimpleAgentsClientBuilder::new()
6640 .with_provider(Arc::new(ToolLoopProvider))
6641 .build()
6642 .expect("client should build");
6643 let worker = FixedToolWorker {
6644 payload: json!({"unexpected": "shape"}),
6645 };
6646
6647 let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6648 &workflow,
6649 &json!({"email_text":"hello"}),
6650 &client,
6651 Some(&worker),
6652 None,
6653 &YamlWorkflowRunOptions::default(),
6654 )
6655 .await
6656 .expect_err("workflow should hard-fail on schema mismatch");
6657
6658 match error {
6659 YamlWorkflowRunError::Llm { message, .. } => {
6660 assert!(message.contains("output failed schema validation"));
6661 }
6662 other => panic!("expected llm error, got {other:?}"),
6663 }
6664 }
6665
6666 #[tokio::test]
6667 async fn yaml_llm_unknown_tool_is_rejected_before_custom_worker_execution() {
6668 let yaml = r#"
6669id: unknown-tool-rejected
6670entry_node: generate_with_tool
6671nodes:
6672 - id: generate_with_tool
6673 node_type:
6674 llm_call:
6675 model: gpt-4.1
6676 tools_format: simplified
6677 max_tool_roundtrips: 1
6678 tools:
6679 - name: get_customer_context
6680 input_schema:
6681 type: object
6682 properties:
6683 order_id: { type: string }
6684 required: [order_id]
6685 config:
6686 output_schema:
6687 type: object
6688 properties:
6689 state: { type: string }
6690 required: [state]
6691"#;
6692
6693 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6694 let client = SimpleAgentsClientBuilder::new()
6695 .with_provider(Arc::new(UnknownToolProvider))
6696 .build()
6697 .expect("client should build");
6698 let worker = CountingToolWorker {
6699 execute_calls: AtomicUsize::new(0),
6700 };
6701
6702 let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6703 &workflow,
6704 &json!({"email_text":"hello"}),
6705 &client,
6706 Some(&worker),
6707 None,
6708 &YamlWorkflowRunOptions::default(),
6709 )
6710 .await
6711 .expect_err("workflow should reject unknown tool before executing worker");
6712
6713 match error {
6714 YamlWorkflowRunError::Llm { message, .. } => {
6715 assert!(message.contains("model requested unknown tool 'unknown_tool'"));
6716 }
6717 other => panic!("expected llm error, got {other:?}"),
6718 }
6719
6720 assert_eq!(worker.execute_calls.load(Ordering::SeqCst), 0);
6721 }
6722
6723 #[test]
6724 fn validates_tools_format_mismatch() {
6725 let yaml = r#"
6726id: mismatch
6727entry_node: generate
6728nodes:
6729 - id: generate
6730 node_type:
6731 llm_call:
6732 model: gpt-4.1
6733 tools_format: openai
6734 tools:
6735 - name: get_customer_context
6736 input_schema:
6737 type: object
6738 properties:
6739 order_id: { type: string }
6740 required: [order_id]
6741"#;
6742
6743 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6744 let diagnostics = verify_yaml_workflow(&workflow);
6745 assert!(diagnostics
6746 .iter()
6747 .any(|diagnostic| diagnostic.code == "invalid_tools_format"));
6748 }
6749
6750 #[test]
6751 fn mock_custom_worker_supports_get_employee_record() {
6752 let result = mock_custom_worker_output(
6753 "get_employee_record",
6754 &json!({"employee_name": "Alex Johnson"}),
6755 )
6756 .expect("mock tool should resolve employee record");
6757
6758 assert_eq!(result["employee_id"], Value::String("EMP-2041".to_string()));
6759 assert_eq!(result["location"], Value::String("Austin".to_string()));
6760 }
6761
6762 #[tokio::test]
6763 async fn custom_worker_receives_trace_context_block() {
6764 let yaml = r#"
6765id: custom-worker-trace-context
6766entry_node: lookup
6767nodes:
6768 - id: lookup
6769 node_type:
6770 custom_worker:
6771 handler: GetRagData
6772 config:
6773 payload:
6774 topic: demo
6775"#;
6776
6777 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6778 let worker = CapturingWorker {
6779 context: Mutex::new(None),
6780 };
6781 let options = YamlWorkflowRunOptions {
6782 trace: YamlWorkflowTraceOptions {
6783 context: Some(YamlWorkflowTraceContextInput {
6784 trace_id: Some("trace-fixed-ctx".to_string()),
6785 traceparent: Some("00-trace-fixed-ctx-span-fixed-01".to_string()),
6786 ..YamlWorkflowTraceContextInput::default()
6787 }),
6788 tenant: YamlWorkflowTraceTenantContext {
6789 conversation_id: Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5".to_string()),
6790 request_id: Some("turn-7".to_string()),
6791 ..YamlWorkflowTraceTenantContext::default()
6792 },
6793 },
6794 ..YamlWorkflowRunOptions::default()
6795 };
6796
6797 run_workflow_yaml_with_custom_worker_and_events_and_options(
6798 &workflow,
6799 &json!({"email_text":"hello"}),
6800 &MockExecutor,
6801 Some(&worker),
6802 None,
6803 &options,
6804 )
6805 .await
6806 .expect("workflow should execute");
6807
6808 let captured_context = worker
6809 .context
6810 .lock()
6811 .expect("capturing worker lock should not be poisoned")
6812 .clone()
6813 .expect("custom worker should receive context");
6814
6815 assert_eq!(
6816 captured_context
6817 .get("trace")
6818 .and_then(|trace| trace.get("context"))
6819 .and_then(|context| context.get("trace_id"))
6820 .and_then(Value::as_str),
6821 Some("trace-fixed-ctx")
6822 );
6823 assert_eq!(
6824 captured_context
6825 .get("trace")
6826 .and_then(|trace| trace.get("context"))
6827 .and_then(|context| context.get("traceparent"))
6828 .and_then(Value::as_str),
6829 Some("00-trace-fixed-ctx-span-fixed-01")
6830 );
6831 assert_eq!(
6832 captured_context
6833 .get("trace")
6834 .and_then(|trace| trace.get("tenant"))
6835 .and_then(|tenant| tenant.get("conversation_id"))
6836 .and_then(Value::as_str),
6837 Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5")
6838 );
6839 }
6840
6841 #[tokio::test]
6842 async fn event_sink_cancellation_stops_workflow_before_llm_execution() {
6843 let yaml = r#"
6844id: cancellation-test
6845entry_node: classify
6846nodes:
6847 - id: classify
6848 node_type:
6849 llm_call:
6850 model: gpt-4.1
6851 config:
6852 prompt: |
6853 Classify this email into exactly one category:
6854 {{ input.email_text }}
6855"#;
6856
6857 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6858 let executor = CountingExecutor {
6859 call_count: AtomicUsize::new(0),
6860 };
6861 let sink = CancelAfterFirstEventSink {
6862 cancelled: AtomicBool::new(false),
6863 };
6864
6865 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
6866 &workflow,
6867 &json!({"email_text":"hello"}),
6868 &executor,
6869 None,
6870 Some(&sink),
6871 &YamlWorkflowRunOptions::default(),
6872 )
6873 .await
6874 .expect_err("workflow should stop when sink signals cancellation");
6875
6876 assert!(matches!(
6877 err,
6878 YamlWorkflowRunError::EventSinkCancelled { .. }
6879 ));
6880 assert_eq!(executor.call_count.load(Ordering::SeqCst), 0);
6881 }
6882
6883 #[tokio::test]
6884 async fn rejects_invalid_messages_path_shape() {
6885 let yaml = r#"
6886id: email-intake-classification
6887entry_node: classify_top_level
6888nodes:
6889 - id: classify_top_level
6890 node_type:
6891 llm_call:
6892 model: gpt-4.1
6893 messages_path: input.messages
6894"#;
6895
6896 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6897 let input = json!({
6898 "email_text": "ignored",
6899 "messages": "not-a-list"
6900 });
6901
6902 let err = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
6903 .await
6904 .expect_err("workflow should fail for invalid messages shape");
6905
6906 assert!(matches!(err, YamlWorkflowRunError::Llm { .. }));
6907 }
6908
6909 #[test]
6910 fn renders_yaml_workflow_to_mermaid_with_switch_labels() {
6911 let yaml = r#"
6912id: chat-workflow
6913entry_node: decide
6914nodes:
6915 - id: decide
6916 node_type:
6917 switch:
6918 branches:
6919 - condition: '$.input.mode == "draft"'
6920 target: draft
6921 default: ask
6922 - id: draft
6923 node_type:
6924 llm_call:
6925 model: gpt-4.1
6926 - id: ask
6927 node_type:
6928 llm_call:
6929 model: gpt-4.1
6930edges:
6931 - from: draft
6932 to: ask
6933"#;
6934
6935 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6936 let mermaid = yaml_workflow_to_mermaid(&workflow);
6937
6938 assert!(mermaid.contains("flowchart TD"));
6939 assert!(mermaid.contains("decide -- \"route1\" --> draft"));
6940 assert!(mermaid.contains("decide -- \"default\" --> ask"));
6941 assert!(mermaid.contains("draft --> ask"));
6942 }
6943
6944 #[test]
6945 fn renders_yaml_workflow_tools_as_colored_tool_nodes() {
6946 let yaml = r#"
6947id: tool-graph
6948entry_node: chat
6949nodes:
6950 - id: chat
6951 node_type:
6952 llm_call:
6953 model: gemini-3-flash
6954 tools_format: simplified
6955 tools:
6956 - name: run_workflow_graph
6957 input_schema:
6958 type: object
6959 properties:
6960 workflow_id: { type: string }
6961 required: [workflow_id]
6962edges: []
6963"#;
6964
6965 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6966 let mermaid = yaml_workflow_to_mermaid(&workflow);
6967
6968 assert!(mermaid.contains("chat__tool_0"));
6969 assert!(mermaid.contains("chat -.-> chat__tool_0"));
6970 assert!(mermaid.contains("classDef toolNode"));
6971 assert!(mermaid.contains("class chat__tool_0 toolNode;"));
6972 }
6973
6974 #[test]
6975 fn renders_yaml_workflow_file_to_mermaid_with_subgraph_cluster_when_present() {
6976 let base_dir = std::env::temp_dir().join(format!(
6977 "simple_agents_mermaid_subgraph_{}",
6978 std::time::SystemTime::now()
6979 .duration_since(std::time::UNIX_EPOCH)
6980 .expect("unix epoch")
6981 .as_nanos()
6982 ));
6983 fs::create_dir_all(&base_dir).expect("temp dir should be created");
6984
6985 let orchestrator_path = base_dir.join("email-chat-orchestrator-with-subgraph-tool.yaml");
6986 let subgraph_path = base_dir.join("hr-warning-email-subgraph.yaml");
6987
6988 let orchestrator_yaml = r#"
6989id: email-chat-orchestrator-with-subgraph-tool
6990entry_node: respond_casual
6991nodes:
6992 - id: respond_casual
6993 node_type:
6994 llm_call:
6995 model: gemini-3-flash
6996 tools_format: simplified
6997 tools:
6998 - name: run_workflow_graph
6999 input_schema:
7000 type: object
7001 properties:
7002 workflow_id: { type: string }
7003 required: [workflow_id]
7004 config:
7005 prompt: |
7006 Call with:
7007 {
7008 "workflow_id": "hr_warning_email_subgraph"
7009 }
7010edges: []
7011"#;
7012
7013 let subgraph_yaml = r#"
7014id: hr-warning-email-subgraph
7015entry_node: draft_hr_warning_email
7016nodes:
7017 - id: draft_hr_warning_email
7018 node_type:
7019 llm_call:
7020 model: gemini-3-flash
7021edges: []
7022"#;
7023
7024 fs::write(&orchestrator_path, orchestrator_yaml).expect("orchestrator yaml written");
7025 fs::write(&subgraph_path, subgraph_yaml).expect("subgraph yaml written");
7026
7027 let mermaid =
7028 yaml_workflow_file_to_mermaid(&orchestrator_path).expect("mermaid should render");
7029
7030 assert!(mermaid.contains("Main: email-chat-orchestrator-with-subgraph-tool"));
7031 assert!(mermaid.contains("Subgraph: hr_warning_email_subgraph"));
7032 assert!(mermaid.contains("calls hr_warning_email_subgraph"));
7033 assert!(mermaid.contains("subgraph_1__draft_hr_warning_email"));
7034
7035 fs::remove_dir_all(base_dir).expect("temp dir removed");
7036 }
7037
7038 #[test]
7039 fn converts_yaml_workflow_to_ir_definition() {
7040 let yaml = r#"
7041id: chat-workflow
7042entry_node: classify
7043nodes:
7044 - id: classify
7045 node_type:
7046 llm_call:
7047 model: gpt-4.1
7048 config:
7049 prompt: |
7050 classify
7051 - id: route
7052 node_type:
7053 switch:
7054 branches:
7055 - condition: '$.nodes.classify.output.kind == "x"'
7056 target: done
7057 default: done
7058 - id: done
7059 node_type:
7060 custom_worker:
7061 handler: GetRagData
7062 config:
7063 payload:
7064 topic: test
7065edges:
7066 - from: classify
7067 to: route
7068"#;
7069
7070 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7071 let ir = yaml_workflow_to_ir(&workflow).expect("yaml should convert to ir");
7072
7073 assert_eq!(ir.name, "chat-workflow");
7074 assert!(ir.nodes.iter().any(|n| n.id == "__yaml_start"));
7075 assert!(ir.nodes.iter().any(|n| n.id == "classify"));
7076 assert!(ir.nodes.iter().any(|n| n.id == "route"));
7077 assert!(ir.nodes.iter().any(|n| n.id == "done"));
7078 }
7079
7080 #[test]
7081 fn supports_yaml_to_ir_when_messages_path_is_used() {
7082 let yaml = r#"
7083id: chat-workflow
7084entry_node: classify
7085nodes:
7086 - id: classify
7087 node_type:
7088 llm_call:
7089 model: gpt-4.1
7090 messages_path: input.messages
7091"#;
7092
7093 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7094 let ir =
7095 yaml_workflow_to_ir(&workflow).expect("messages_path should convert to tool-based IR");
7096 assert!(ir.nodes.iter().any(|node| matches!(
7097 node.kind,
7098 crate::ir::NodeKind::Tool { ref tool, .. } if tool == "__yaml_llm_call"
7099 )));
7100 }
7101
7102 #[tokio::test]
7103 async fn workflow_output_contains_trace_id_in_both_locations() {
7104 let yaml = r#"
7105id: trace-test
7106entry_node: classify
7107nodes:
7108 - id: classify
7109 node_type:
7110 llm_call:
7111 model: gpt-4.1
7112 config:
7113 prompt: |
7114 Classify this email into exactly one category:
7115 {{ input.email_text }}
7116"#;
7117
7118 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7119 let output = run_workflow_yaml(&workflow, &json!({"email_text":"hello"}), &MockExecutor)
7120 .await
7121 .expect("workflow should execute");
7122
7123 let trace_id = output
7124 .trace_id
7125 .as_deref()
7126 .expect("trace_id should be present");
7127 assert!(!trace_id.is_empty());
7128 assert_eq!(
7129 output.metadata.as_ref().and_then(|value| {
7130 value
7131 .get("telemetry")
7132 .and_then(|telemetry| telemetry.get("trace_id"))
7133 .and_then(Value::as_str)
7134 }),
7135 Some(trace_id)
7136 );
7137 }
7138
7139 #[tokio::test]
7140 async fn workflow_run_options_sample_rate_zero_marks_trace_unsampled() {
7141 let yaml = r#"
7142id: sample-rate-zero
7143entry_node: classify
7144nodes:
7145 - id: classify
7146 node_type:
7147 llm_call:
7148 model: gpt-4.1
7149 config:
7150 prompt: |
7151 Classify this email into exactly one category:
7152 {{ input.email_text }}
7153"#;
7154
7155 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7156 let options = YamlWorkflowRunOptions {
7157 telemetry: YamlWorkflowTelemetryConfig {
7158 sample_rate: 0.0,
7159 ..YamlWorkflowTelemetryConfig::default()
7160 },
7161 trace: YamlWorkflowTraceOptions {
7162 context: Some(YamlWorkflowTraceContextInput {
7163 trace_id: Some("trace-sample-zero".to_string()),
7164 ..YamlWorkflowTraceContextInput::default()
7165 }),
7166 tenant: YamlWorkflowTraceTenantContext::default(),
7167 },
7168 model: None,
7169 };
7170
7171 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7172 &workflow,
7173 &json!({"email_text":"hello"}),
7174 &MockExecutor,
7175 None,
7176 None,
7177 &options,
7178 )
7179 .await
7180 .expect("workflow should execute");
7181
7182 assert_eq!(output.trace_id.as_deref(), Some("trace-sample-zero"));
7183 assert_eq!(
7184 output
7185 .metadata
7186 .as_ref()
7187 .and_then(|value| value.get("telemetry"))
7188 .and_then(|telemetry| telemetry.get("sampled"))
7189 .and_then(Value::as_bool),
7190 Some(false)
7191 );
7192 }
7193
7194 #[test]
7195 fn trace_id_from_traceparent_parses_w3c_header() {
7196 let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
7197 assert_eq!(
7198 trace_id_from_traceparent(traceparent).as_deref(),
7199 Some("4bf92f3577b34da6a3ce929d0e0e4736")
7200 );
7201 }
7202
7203 #[test]
7204 fn resolve_telemetry_context_marks_traceparent_source() {
7205 let mut options = YamlWorkflowRunOptions::default();
7206 options.trace.context = Some(YamlWorkflowTraceContextInput {
7207 traceparent: Some(
7208 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
7209 ),
7210 ..YamlWorkflowTraceContextInput::default()
7211 });
7212
7213 let context = resolve_telemetry_context(&options, None);
7214
7215 assert_eq!(context.trace_id_source, TraceIdSource::Traceparent);
7216 assert_eq!(
7217 context.trace_id.as_deref(),
7218 Some("4bf92f3577b34da6a3ce929d0e0e4736")
7219 );
7220 }
7221
7222 #[tokio::test]
7223 async fn workflow_run_options_derives_trace_id_from_traceparent_when_trace_id_missing() {
7224 let yaml = r#"
7225id: traceparent-derived
7226entry_node: classify
7227nodes:
7228 - id: classify
7229 node_type:
7230 llm_call:
7231 model: gpt-4.1
7232 config:
7233 prompt: |
7234 Classify this email into exactly one category:
7235 {{ input.email_text }}
7236"#;
7237
7238 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7239 let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
7240 let options = YamlWorkflowRunOptions {
7241 telemetry: YamlWorkflowTelemetryConfig {
7242 sample_rate: 0.0,
7243 ..YamlWorkflowTelemetryConfig::default()
7244 },
7245 trace: YamlWorkflowTraceOptions {
7246 context: Some(YamlWorkflowTraceContextInput {
7247 traceparent: Some(traceparent.to_string()),
7248 ..YamlWorkflowTraceContextInput::default()
7249 }),
7250 tenant: YamlWorkflowTraceTenantContext::default(),
7251 },
7252 model: None,
7253 };
7254
7255 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7256 &workflow,
7257 &json!({"email_text":"hello"}),
7258 &MockExecutor,
7259 None,
7260 None,
7261 &options,
7262 )
7263 .await
7264 .expect("workflow should execute");
7265
7266 assert_eq!(
7267 output.trace_id.as_deref(),
7268 Some("4bf92f3577b34da6a3ce929d0e0e4736")
7269 );
7270 assert_eq!(
7271 output
7272 .metadata
7273 .as_ref()
7274 .and_then(|value| value.get("telemetry"))
7275 .and_then(|telemetry| telemetry.get("sampled"))
7276 .and_then(Value::as_bool),
7277 Some(false)
7278 );
7279 }
7280
7281 #[tokio::test]
7282 async fn workflow_run_options_sample_rate_one_marks_trace_sampled() {
7283 let yaml = r#"
7284id: sample-rate-one
7285entry_node: classify
7286nodes:
7287 - id: classify
7288 node_type:
7289 llm_call:
7290 model: gpt-4.1
7291 config:
7292 prompt: |
7293 Classify this email into exactly one category:
7294 {{ input.email_text }}
7295"#;
7296
7297 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7298 let options = YamlWorkflowRunOptions {
7299 telemetry: YamlWorkflowTelemetryConfig {
7300 sample_rate: 1.0,
7301 ..YamlWorkflowTelemetryConfig::default()
7302 },
7303 trace: YamlWorkflowTraceOptions {
7304 context: Some(YamlWorkflowTraceContextInput {
7305 trace_id: Some("trace-sample-one".to_string()),
7306 ..YamlWorkflowTraceContextInput::default()
7307 }),
7308 tenant: YamlWorkflowTraceTenantContext::default(),
7309 },
7310 model: None,
7311 };
7312
7313 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7314 &workflow,
7315 &json!({"email_text":"hello"}),
7316 &MockExecutor,
7317 None,
7318 None,
7319 &options,
7320 )
7321 .await
7322 .expect("workflow should execute");
7323
7324 assert_eq!(output.trace_id.as_deref(), Some("trace-sample-one"));
7325 assert_eq!(
7326 output
7327 .metadata
7328 .as_ref()
7329 .and_then(|value| value.get("telemetry"))
7330 .and_then(|telemetry| telemetry.get("sampled"))
7331 .and_then(Value::as_bool),
7332 Some(true)
7333 );
7334 }
7335
7336 #[tokio::test]
7337 async fn workflow_run_options_sampling_is_deterministic_for_fixed_trace_id() {
7338 let yaml = r#"
7339id: sample-rate-deterministic
7340entry_node: classify
7341nodes:
7342 - id: classify
7343 node_type:
7344 llm_call:
7345 model: gpt-4.1
7346 config:
7347 prompt: |
7348 Classify this email into exactly one category:
7349 {{ input.email_text }}
7350"#;
7351
7352 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7353 let options = YamlWorkflowRunOptions {
7354 telemetry: YamlWorkflowTelemetryConfig {
7355 sample_rate: 0.5,
7356 ..YamlWorkflowTelemetryConfig::default()
7357 },
7358 trace: YamlWorkflowTraceOptions {
7359 context: Some(YamlWorkflowTraceContextInput {
7360 trace_id: Some("trace-sample-deterministic".to_string()),
7361 ..YamlWorkflowTraceContextInput::default()
7362 }),
7363 tenant: YamlWorkflowTraceTenantContext::default(),
7364 },
7365 model: None,
7366 };
7367
7368 let mut sampled_values = Vec::new();
7369 for _ in 0..3 {
7370 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7371 &workflow,
7372 &json!({"email_text":"hello"}),
7373 &MockExecutor,
7374 None,
7375 None,
7376 &options,
7377 )
7378 .await
7379 .expect("workflow should execute");
7380
7381 let sampled = output
7382 .metadata
7383 .as_ref()
7384 .and_then(|value| value.get("telemetry"))
7385 .and_then(|telemetry| telemetry.get("sampled"))
7386 .and_then(Value::as_bool)
7387 .expect("sampled flag should be present");
7388 sampled_values.push(sampled);
7389 }
7390
7391 assert!(sampled_values
7392 .iter()
7393 .all(|value| *value == sampled_values[0]));
7394 }
7395
7396 #[tokio::test]
7397 async fn workflow_run_options_reject_invalid_sample_rate() {
7398 let yaml = r#"
7399id: sample-rate-invalid
7400entry_node: classify
7401nodes:
7402 - id: classify
7403 node_type:
7404 llm_call:
7405 model: gpt-4.1
7406 config:
7407 prompt: |
7408 Classify this email into exactly one category:
7409 {{ input.email_text }}
7410"#;
7411
7412 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7413 let options = YamlWorkflowRunOptions {
7414 telemetry: YamlWorkflowTelemetryConfig {
7415 sample_rate: 1.1,
7416 ..YamlWorkflowTelemetryConfig::default()
7417 },
7418 ..YamlWorkflowRunOptions::default()
7419 };
7420
7421 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
7422 &workflow,
7423 &json!({"email_text":"hello"}),
7424 &MockExecutor,
7425 None,
7426 None,
7427 &options,
7428 )
7429 .await
7430 .expect_err("invalid sample_rate should fail");
7431
7432 match err {
7433 YamlWorkflowRunError::InvalidInput { message } => {
7434 assert!(message.contains("telemetry.sample_rate"));
7435 }
7436 _ => panic!("expected invalid input error for sample_rate"),
7437 }
7438 }
7439
7440 #[tokio::test]
7441 async fn workflow_run_options_reject_nan_sample_rate() {
7442 let yaml = r#"
7443id: sample-rate-invalid
7444entry_node: classify
7445nodes:
7446 - id: classify
7447 node_type:
7448 llm_call:
7449 model: gpt-4.1
7450 config:
7451 prompt: |
7452 Classify this email into exactly one category:
7453 {{ input.email_text }}
7454"#;
7455
7456 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7457 let options = YamlWorkflowRunOptions {
7458 telemetry: YamlWorkflowTelemetryConfig {
7459 sample_rate: f32::NAN,
7460 ..YamlWorkflowTelemetryConfig::default()
7461 },
7462 ..YamlWorkflowRunOptions::default()
7463 };
7464
7465 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
7466 &workflow,
7467 &json!({"email_text":"hello"}),
7468 &MockExecutor,
7469 None,
7470 None,
7471 &options,
7472 )
7473 .await
7474 .expect_err("nan sample_rate should fail");
7475
7476 assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
7477 }
7478
7479 #[test]
7480 fn subworkflow_options_inherit_parent_telemetry_configuration() {
7481 let mut parent_options = YamlWorkflowRunOptions::default();
7482 parent_options.telemetry.sample_rate = 0.0;
7483 parent_options.telemetry.payload_mode = YamlWorkflowPayloadMode::RedactedPayload;
7484 parent_options.telemetry.tool_trace_mode = YamlToolTraceMode::Redacted;
7485 parent_options.trace.tenant.conversation_id = Some("conv-123".to_string());
7486
7487 let parent_context = TraceContext {
7488 trace_id: Some("trace-parent".to_string()),
7489 span_id: Some("span-parent".to_string()),
7490 parent_span_id: None,
7491 traceparent: Some("00-trace-parent-span-parent-01".to_string()),
7492 tracestate: None,
7493 baggage: BTreeMap::new(),
7494 };
7495
7496 let subworkflow_options =
7497 build_subworkflow_options(&parent_options, Some(&parent_context), None);
7498
7499 assert_eq!(subworkflow_options.telemetry.sample_rate, 0.0);
7500 assert_eq!(
7501 subworkflow_options.telemetry.payload_mode,
7502 YamlWorkflowPayloadMode::RedactedPayload
7503 );
7504 assert_eq!(
7505 subworkflow_options.telemetry.tool_trace_mode,
7506 YamlToolTraceMode::Redacted
7507 );
7508 assert_eq!(
7509 subworkflow_options.trace.tenant.conversation_id.as_deref(),
7510 Some("conv-123")
7511 );
7512 assert_eq!(
7513 subworkflow_options
7514 .trace
7515 .context
7516 .as_ref()
7517 .and_then(|ctx| ctx.trace_id.as_deref()),
7518 Some("trace-parent")
7519 );
7520 }
7521
7522 #[test]
7523 fn trace_tenant_attributes_include_langfuse_aliases() {
7524 let tenant = YamlWorkflowTraceTenantContext {
7525 workspace_id: Some("ws-1".to_string()),
7526 user_id: Some("user-1".to_string()),
7527 conversation_id: Some("conv-1".to_string()),
7528 request_id: Some("req-1".to_string()),
7529 run_id: Some("run-1".to_string()),
7530 };
7531 let mut span = CapturingSpan::default();
7532 apply_trace_tenant_attributes_from_tenant(&mut span, &tenant);
7533
7534 assert_eq!(
7535 span.attributes
7536 .get("tenant.workspace_id")
7537 .map(String::as_str),
7538 Some("ws-1")
7539 );
7540 assert_eq!(
7541 span.attributes.get("tenant.user_id").map(String::as_str),
7542 Some("user-1")
7543 );
7544 assert_eq!(
7545 span.attributes
7546 .get("tenant.conversation_id")
7547 .map(String::as_str),
7548 Some("conv-1")
7549 );
7550 assert_eq!(
7551 span.attributes.get("langfuse.user.id").map(String::as_str),
7552 Some("user-1")
7553 );
7554 assert_eq!(
7555 span.attributes
7556 .get("langfuse.session.id")
7557 .map(String::as_str),
7558 Some("conv-1")
7559 );
7560 }
7561
7562 #[test]
7563 fn langfuse_nerdstats_attributes_are_written_when_enabled() {
7564 let output = YamlWorkflowRunOutput {
7565 workflow_id: "wf-1".to_string(),
7566 entry_node: "start".to_string(),
7567 email_text: "email".to_string(),
7568 trace: vec!["node-1".to_string()],
7569 outputs: BTreeMap::new(),
7570 terminal_node: "node-1".to_string(),
7571 terminal_output: None,
7572 step_timings: vec![YamlStepTiming {
7573 node_id: "node-1".to_string(),
7574 node_kind: "llm_call".to_string(),
7575 model_name: Some("gpt-4.1-mini".to_string()),
7576 elapsed_ms: 42,
7577 prompt_tokens: Some(4),
7578 completion_tokens: Some(6),
7579 total_tokens: Some(10),
7580 reasoning_tokens: Some(2),
7581 tokens_per_second: Some(14.0),
7582 }],
7583 llm_node_metrics: BTreeMap::new(),
7584 llm_node_models: BTreeMap::new(),
7585 total_elapsed_ms: 42,
7586 ttft_ms: Some(7),
7587 total_input_tokens: 4,
7588 total_output_tokens: 6,
7589 total_tokens: 10,
7590 total_reasoning_tokens: Some(2),
7591 tokens_per_second: 14.0,
7592 trace_id: Some("trace-1".to_string()),
7593 metadata: None,
7594 };
7595
7596 let mut span = CapturingSpan::default();
7597 apply_langfuse_nerdstats_attributes(&mut span, &output, true);
7598
7599 assert_eq!(
7600 span.attributes
7601 .get("langfuse.trace.metadata.nerdstats.workflow_id")
7602 .map(String::as_str),
7603 Some("wf-1")
7604 );
7605 assert_eq!(
7606 span.attributes
7607 .get("langfuse.trace.metadata.nerdstats.total_tokens")
7608 .map(String::as_str),
7609 Some("10")
7610 );
7611 assert_eq!(
7612 span.attributes
7613 .get("langfuse.trace.metadata.nerdstats.token_metrics_available")
7614 .map(String::as_str),
7615 Some("true")
7616 );
7617 assert!(span
7618 .attributes
7619 .contains_key("langfuse.trace.metadata.nerdstats"));
7620 }
7621
7622 #[test]
7623 fn langfuse_trace_input_output_and_usage_are_written() {
7624 let output = YamlWorkflowRunOutput {
7625 workflow_id: "wf-1".to_string(),
7626 entry_node: "start".to_string(),
7627 email_text: "email".to_string(),
7628 trace: vec!["node-1".to_string()],
7629 outputs: BTreeMap::new(),
7630 terminal_node: "node-1".to_string(),
7631 terminal_output: Some(json!({"final":"ok"})),
7632 step_timings: Vec::new(),
7633 llm_node_metrics: BTreeMap::new(),
7634 llm_node_models: BTreeMap::new(),
7635 total_elapsed_ms: 10,
7636 ttft_ms: None,
7637 total_input_tokens: 11,
7638 total_output_tokens: 22,
7639 total_tokens: 33,
7640 total_reasoning_tokens: Some(4),
7641 tokens_per_second: 1.5,
7642 trace_id: Some("trace-1".to_string()),
7643 metadata: None,
7644 };
7645
7646 let mut span = CapturingSpan::default();
7647 let input = json!({"email_text":"hello"});
7648 apply_langfuse_trace_input_output_attributes(
7649 &mut span,
7650 &input,
7651 &output,
7652 YamlWorkflowPayloadMode::FullPayload,
7653 );
7654
7655 assert_eq!(
7656 span.attributes
7657 .get("langfuse.trace.input")
7658 .map(String::as_str),
7659 Some("{\"email_text\":\"hello\"}")
7660 );
7661 assert_eq!(
7662 span.attributes
7663 .get("langfuse.trace.output")
7664 .map(String::as_str),
7665 Some("{\"final\":\"ok\"}")
7666 );
7667 assert!(span
7668 .attributes
7669 .contains_key("langfuse.trace.metadata.usage_details"));
7670 }
7671
7672 #[test]
7673 fn langfuse_observation_usage_attributes_are_written() {
7674 let usage = YamlLlmTokenUsage {
7675 prompt_tokens: 7,
7676 completion_tokens: 9,
7677 total_tokens: 16,
7678 reasoning_tokens: Some(3),
7679 };
7680 let mut span = CapturingSpan::default();
7681 apply_langfuse_observation_usage_attributes(&mut span, &usage);
7682
7683 assert_eq!(
7684 span.attributes
7685 .get("gen_ai.usage.input_tokens")
7686 .map(String::as_str),
7687 Some("7")
7688 );
7689 assert_eq!(
7690 span.attributes
7691 .get("gen_ai.usage.output_tokens")
7692 .map(String::as_str),
7693 Some("9")
7694 );
7695 assert_eq!(
7696 span.attributes
7697 .get("gen_ai.usage.total_tokens")
7698 .map(String::as_str),
7699 Some("16")
7700 );
7701 assert_eq!(
7702 span.attributes
7703 .get("gen_ai.usage.reasoning_tokens")
7704 .map(String::as_str),
7705 Some("3")
7706 );
7707 assert!(span
7708 .attributes
7709 .contains_key("langfuse.observation.usage_details"));
7710 }
7711
7712 #[tokio::test]
7713 async fn workflow_run_options_use_explicit_trace_id_and_payload_mode() {
7714 let yaml = r#"
7715id: trace-options-test
7716entry_node: classify
7717nodes:
7718 - id: classify
7719 node_type:
7720 llm_call:
7721 model: gpt-4.1
7722 config:
7723 prompt: |
7724 Classify this email into exactly one category:
7725 {{ input.email_text }}
7726"#;
7727
7728 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7729 let options = YamlWorkflowRunOptions {
7730 telemetry: YamlWorkflowTelemetryConfig {
7731 payload_mode: YamlWorkflowPayloadMode::RedactedPayload,
7732 ..YamlWorkflowTelemetryConfig::default()
7733 },
7734 trace: YamlWorkflowTraceOptions {
7735 context: Some(YamlWorkflowTraceContextInput {
7736 trace_id: Some("trace-fixed-123".to_string()),
7737 traceparent: Some("00-trace-fixed-123-span-1-01".to_string()),
7738 ..YamlWorkflowTraceContextInput::default()
7739 }),
7740 tenant: YamlWorkflowTraceTenantContext {
7741 conversation_id: Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42".to_string()),
7742 ..YamlWorkflowTraceTenantContext::default()
7743 },
7744 },
7745 model: None,
7746 };
7747
7748 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7749 &workflow,
7750 &json!({"email_text":"hello"}),
7751 &MockExecutor,
7752 None,
7753 None,
7754 &options,
7755 )
7756 .await
7757 .expect("workflow should execute");
7758
7759 assert_eq!(output.trace_id.as_deref(), Some("trace-fixed-123"));
7760 assert_eq!(
7761 output
7762 .metadata
7763 .as_ref()
7764 .and_then(|value| value.get("telemetry"))
7765 .and_then(|telemetry| telemetry.get("payload_mode"))
7766 .and_then(Value::as_str),
7767 Some("redacted_payload")
7768 );
7769 assert_eq!(
7770 output
7771 .metadata
7772 .as_ref()
7773 .and_then(|value| value.get("trace"))
7774 .and_then(|trace| trace.get("tenant"))
7775 .and_then(|tenant| tenant.get("conversation_id"))
7776 .and_then(Value::as_str),
7777 Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42")
7778 );
7779 }
7780
7781 #[test]
7782 fn streamed_payload_parser_extracts_last_json_object() {
7783 let raw = r#"{"state":"missing_scenario","reason":"ok"}
7784
7785extra reasoning text
7786
7787{"state":"ready","reason":"final"}"#;
7788
7789 let resolved = parse_streamed_structured_payload(raw, false)
7790 .expect("parser should extract final JSON object");
7791 assert_eq!(resolved.payload["state"], "ready");
7792 assert!(resolved.heal_confidence.is_none());
7793 }
7794
7795 #[test]
7796 fn streamed_payload_parser_handles_unbalanced_reasoning_before_json() {
7797 let raw = "reasoning text with unmatched { braces and thoughts\n{\"state\":\"ready\",\"reason\":\"final\"}";
7798
7799 let resolved = parse_streamed_structured_payload(raw, false)
7800 .expect("parser should recover final structured JSON object");
7801 assert_eq!(resolved.payload["state"], "ready");
7802 }
7803
7804 #[test]
7805 fn streamed_payload_parser_handles_markdown_with_heal() {
7806 let raw = r#"Some preface
7807```json
7808{
7809 "state": "missing_scenario",
7810 "reason": "Need more details"
7811}
7812```
7813Some trailing explanation"#;
7814
7815 let resolved = parse_streamed_structured_payload(raw, true)
7816 .expect("heal path should parse JSON block");
7817 assert_eq!(resolved.payload["state"], "missing_scenario");
7818 assert!(resolved.heal_confidence.is_some());
7819 }
7820
7821 #[test]
7822 fn streamed_payload_parser_errors_when_no_json_candidate_exists() {
7823 let raw = "No JSON in this streamed output";
7824 let error = parse_streamed_structured_payload(raw, false)
7825 .expect_err("strict stream parse should fail without JSON candidate");
7826 assert!(error.contains("no JSON object candidate found"));
7827 }
7828
7829 #[test]
7830 fn include_raw_stream_debug_events_defaults_to_false() {
7831 let _guard = stream_debug_env_lock()
7832 .lock()
7833 .expect("stream debug env lock should not be poisoned");
7834 std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
7835 assert!(!include_raw_stream_debug_events());
7836 }
7837
7838 #[test]
7839 fn include_raw_stream_debug_events_accepts_truthy_values() {
7840 let _guard = stream_debug_env_lock()
7841 .lock()
7842 .expect("stream debug env lock should not be poisoned");
7843 std::env::set_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW", "true");
7844 assert!(include_raw_stream_debug_events());
7845 std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
7846 }
7847
7848 #[test]
7849 fn structured_json_delta_filter_strips_reasoning_prefix_and_suffix() {
7850 let mut filter = StructuredJsonDeltaFilter::default();
7851 let chunks = vec![
7852 "I will think first... ",
7853 "{\"state\":\"missing_scenario\",",
7854 "\"reason\":\"Need more details\"}",
7855 " additional commentary",
7856 ];
7857
7858 let filtered = chunks
7859 .into_iter()
7860 .filter_map(|chunk| filter.split(chunk).0)
7861 .collect::<String>();
7862
7863 assert_eq!(
7864 filtered,
7865 "{\"state\":\"missing_scenario\",\"reason\":\"Need more details\"}"
7866 );
7867 }
7868
7869 #[test]
7870 fn structured_json_delta_filter_handles_braces_inside_strings() {
7871 let mut filter = StructuredJsonDeltaFilter::default();
7872 let chunks = vec![
7873 "preface ",
7874 "{\"reason\":\"brace } in text\",\"state\":\"ok\"}",
7875 " trailing",
7876 ];
7877
7878 let filtered = chunks
7879 .into_iter()
7880 .filter_map(|chunk| filter.split(chunk).0)
7881 .collect::<String>();
7882
7883 assert_eq!(
7884 filtered,
7885 "{\"reason\":\"brace } in text\",\"state\":\"ok\"}"
7886 );
7887 }
7888
7889 #[test]
7890 fn render_json_object_as_text_converts_top_level_fields() {
7891 let rendered =
7892 render_json_object_as_text(r#"{"question":"q","confidence":0.8,"nested":{"a":1}}"#);
7893 let lines: std::collections::HashSet<&str> = rendered.lines().collect();
7894
7895 assert_eq!(lines.len(), 3);
7896 assert!(lines.contains("question: q"));
7897 assert!(lines.contains("confidence: 0.8"));
7898 assert!(lines.contains("nested: {\"a\":1}"));
7899 }
7900
7901 #[test]
7902 fn stream_json_as_text_formatter_emits_once_when_complete() {
7903 let mut formatter = StreamJsonAsTextFormatter::default();
7904 formatter.push("{\"question\":\"hello\"}");
7905
7906 let first = formatter.emit_if_ready(true);
7907 let second = formatter.emit_if_ready(true);
7908
7909 assert_eq!(first, Some("question: hello".to_string()));
7910 assert_eq!(second, None);
7911 }
7912
7913 #[test]
7914 fn rewrite_yaml_condition_preserves_output_prefix_in_field_names() {
7915 let expr = "$.nodes.classify.output.output_total == 1";
7916 let rewritten = rewrite_yaml_condition_to_ir(expr);
7917 assert_eq!(rewritten, "$.node_outputs.classify.output_total == 1");
7918 }
7919
7920 #[tokio::test]
7921 async fn validates_workflow_input_before_ir_runtime_path() {
7922 let yaml = r#"
7923id: chat-workflow
7924entry_node: classify
7925nodes:
7926 - id: classify
7927 node_type:
7928 llm_call:
7929 model: gpt-4.1
7930 config:
7931 prompt: |
7932 classify
7933"#;
7934
7935 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7936 let err = run_workflow_yaml(&workflow, &json!("not-an-object"), &MockExecutor)
7937 .await
7938 .expect_err("non-object input should fail before execution");
7939
7940 assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
7941 }
7942
7943 #[test]
7944 fn interpolate_template_supports_dollar_prefixed_paths() {
7945 let context = json!({
7946 "input": {
7947 "email_text": "hello"
7948 }
7949 });
7950
7951 let rendered = interpolate_template("value={{ $.input.email_text }}", &context);
7952 assert_eq!(rendered, "value=hello");
7953 }
7954}