1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::Path;
3use std::time::Instant;
4
5use async_trait::async_trait;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use simple_agent_type::message::{Message, Role};
10use simple_agent_type::request::CompletionRequest;
11use simple_agent_type::response::FinishReason;
12use simple_agent_type::tool::{
13 ToolCall, ToolChoice, ToolChoiceFunction, ToolChoiceMode, ToolChoiceTool, ToolDefinition,
14 ToolFunction, ToolType,
15};
16use simple_agents_core::{
17 CompletionMode, CompletionOptions, CompletionOutcome, SimpleAgentsClient,
18};
19use simple_agents_healing::JsonishParser;
20use thiserror::Error;
21
22use crate::ir::{Node, NodeKind, RouterRoute, WorkflowDefinition, WORKFLOW_IR_V0};
23use crate::observability::tracing::{
24 flush_workflow_tracer, workflow_tracer, SpanKind, TraceContext, WorkflowSpan,
25};
26use crate::runtime::{
27 LlmExecutionError, LlmExecutionInput, LlmExecutionOutput, LlmExecutor, ToolExecutionError,
28 ToolExecutionInput, ToolExecutor, WorkflowRuntime, WorkflowRuntimeError,
29 WorkflowRuntimeOptions,
30};
31use crate::visualize::workflow_to_mermaid;
32
33const YAML_START_NODE_ID: &str = "__yaml_start";
34const YAML_LLM_TOOL_ID: &str = "__yaml_llm_call";
35
36static TRACE_ID_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
37
38#[derive(Debug, Clone, PartialEq, Serialize)]
39pub struct YamlStepTiming {
40 pub node_id: String,
41 pub node_kind: String,
42 #[serde(skip_serializing_if = "Option::is_none")]
43 pub model_name: Option<String>,
44 pub elapsed_ms: u128,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub prompt_tokens: Option<u32>,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub completion_tokens: Option<u32>,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub total_tokens: Option<u32>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub reasoning_tokens: Option<u32>,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 pub tokens_per_second: Option<f64>,
55}
56
57#[derive(Debug, Clone, PartialEq, Serialize)]
58pub struct YamlLlmNodeMetrics {
59 pub elapsed_ms: u128,
60 pub prompt_tokens: u32,
61 pub completion_tokens: u32,
62 pub total_tokens: u32,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub reasoning_tokens: Option<u32>,
65 pub tokens_per_second: f64,
66}
67
68#[derive(Debug, Clone, PartialEq, Serialize)]
69pub struct YamlWorkflowRunOutput {
70 pub workflow_id: String,
71 pub entry_node: String,
72 pub email_text: String,
73 pub trace: Vec<String>,
74 pub outputs: BTreeMap<String, Value>,
75 pub terminal_node: String,
76 pub terminal_output: Option<Value>,
77 pub step_timings: Vec<YamlStepTiming>,
78 pub llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics>,
79 pub llm_node_models: BTreeMap<String, String>,
80 pub total_elapsed_ms: u128,
81 #[serde(skip_serializing_if = "Option::is_none")]
82 pub ttft_ms: Option<u128>,
83 pub total_input_tokens: u64,
84 pub total_output_tokens: u64,
85 pub total_tokens: u64,
86 #[serde(skip_serializing_if = "Option::is_none")]
87 pub total_reasoning_tokens: Option<u64>,
88 pub tokens_per_second: f64,
89 #[serde(skip_serializing_if = "Option::is_none")]
90 pub trace_id: Option<String>,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub metadata: Option<Value>,
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
96#[serde(rename_all = "snake_case")]
97pub enum YamlWorkflowPayloadMode {
98 #[default]
99 FullPayload,
100 RedactedPayload,
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
104#[serde(rename_all = "snake_case")]
105pub enum YamlToolTraceMode {
106 #[default]
107 Full,
108 Redacted,
109 Off,
110}
111
112#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
113pub struct YamlWorkflowTraceContextInput {
114 #[serde(default)]
115 pub trace_id: Option<String>,
116 #[serde(default)]
117 pub span_id: Option<String>,
118 #[serde(default)]
119 pub parent_span_id: Option<String>,
120 #[serde(default)]
121 pub traceparent: Option<String>,
122 #[serde(default)]
123 pub tracestate: Option<String>,
124 #[serde(default)]
125 pub baggage: BTreeMap<String, String>,
126}
127
128#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
129pub struct YamlWorkflowTraceTenantContext {
130 #[serde(default)]
131 pub workspace_id: Option<String>,
132 #[serde(default)]
133 pub user_id: Option<String>,
134 #[serde(default)]
135 pub conversation_id: Option<String>,
136 #[serde(default)]
137 pub request_id: Option<String>,
138 #[serde(default)]
139 pub run_id: Option<String>,
140}
141
142#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
143pub struct YamlWorkflowTelemetryConfig {
144 #[serde(default = "default_true")]
145 pub enabled: bool,
146 #[serde(default = "default_true")]
147 pub nerdstats: bool,
148 #[serde(default = "default_sample_rate")]
149 pub sample_rate: f32,
150 #[serde(default)]
151 pub payload_mode: YamlWorkflowPayloadMode,
152 #[serde(default = "default_retention_days")]
153 pub retention_days: u32,
154 #[serde(default = "default_true")]
155 pub multi_tenant: bool,
156 #[serde(default)]
157 pub tool_trace_mode: YamlToolTraceMode,
158}
159
160impl Default for YamlWorkflowTelemetryConfig {
161 fn default() -> Self {
162 Self {
163 enabled: true,
164 nerdstats: true,
165 sample_rate: 1.0,
166 payload_mode: YamlWorkflowPayloadMode::FullPayload,
167 retention_days: 30,
168 multi_tenant: true,
169 tool_trace_mode: YamlToolTraceMode::Full,
170 }
171 }
172}
173
174#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
175pub struct YamlWorkflowTraceOptions {
176 #[serde(default)]
177 pub context: Option<YamlWorkflowTraceContextInput>,
178 #[serde(default)]
179 pub tenant: YamlWorkflowTraceTenantContext,
180}
181
182#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
183pub struct YamlWorkflowRunOptions {
184 #[serde(default)]
185 pub telemetry: YamlWorkflowTelemetryConfig,
186 #[serde(default)]
187 pub trace: YamlWorkflowTraceOptions,
188 #[serde(default)]
189 pub model: Option<String>,
190}
191
192#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
193pub struct YamlLlmTokenUsage {
194 pub prompt_tokens: u32,
195 pub completion_tokens: u32,
196 pub total_tokens: u32,
197 pub reasoning_tokens: Option<u32>,
198}
199
200#[derive(Debug, Clone, PartialEq, Serialize)]
201pub struct YamlLlmExecutionResult {
202 pub payload: Value,
203 pub usage: Option<YamlLlmTokenUsage>,
204 pub ttft_ms: Option<u128>,
205 pub tool_calls: Vec<YamlToolCallTrace>,
206}
207
208#[derive(Debug, Clone, PartialEq, Serialize)]
209pub struct YamlToolCallTrace {
210 pub id: String,
211 pub name: String,
212 pub arguments: Value,
213 pub output: Option<Value>,
214 pub status: String,
215 pub elapsed_ms: u128,
216 pub error: Option<String>,
217}
218
219#[derive(Debug, Clone, Default)]
220struct YamlTokenTotals {
221 input_tokens: u64,
222 output_tokens: u64,
223 total_tokens: u64,
224 reasoning_tokens: Option<u64>,
225}
226
227impl YamlTokenTotals {
228 fn add_usage(&mut self, usage: &YamlLlmTokenUsage) {
229 self.input_tokens += u64::from(usage.prompt_tokens);
230 self.output_tokens += u64::from(usage.completion_tokens);
231 self.total_tokens += u64::from(usage.total_tokens);
232
233 if let Some(reasoning_tokens) = usage.reasoning_tokens {
234 let next = self.reasoning_tokens.unwrap_or(0) + u64::from(reasoning_tokens);
235 self.reasoning_tokens = Some(next);
236 }
237 }
238
239 fn tokens_per_second(&self, elapsed_ms: u128) -> f64 {
240 if elapsed_ms == 0 {
241 return 0.0;
242 }
243 round_two_decimals((self.output_tokens as f64) * 1000.0 / (elapsed_ms as f64))
244 }
245}
246
247fn round_two_decimals(value: f64) -> f64 {
248 (value * 100.0).round() / 100.0
249}
250
251fn completion_tokens_per_second(completion_tokens: u32, elapsed_ms: u128) -> f64 {
252 if elapsed_ms == 0 {
253 return 0.0;
254 }
255 round_two_decimals((completion_tokens as f64) * 1000.0 / (elapsed_ms as f64))
256}
257
258fn resolve_requested_model(run_model_override: Option<&str>, node_model: &str) -> String {
259 run_model_override
260 .and_then(|model| {
261 let trimmed = model.trim();
262 if trimmed.is_empty() {
263 None
264 } else {
265 Some(trimmed.to_string())
266 }
267 })
268 .unwrap_or_else(|| node_model.to_string())
269}
270
271fn default_true() -> bool {
272 true
273}
274
275fn default_sample_rate() -> f32 {
276 1.0
277}
278
279fn default_retention_days() -> u32 {
280 30
281}
282
283fn validate_sample_rate(sample_rate: f32) -> Result<(), YamlWorkflowRunError> {
284 if sample_rate.is_finite() && (0.0..=1.0).contains(&sample_rate) {
285 return Ok(());
286 }
287
288 Err(YamlWorkflowRunError::InvalidInput {
289 message: format!(
290 "telemetry.sample_rate must be between 0.0 and 1.0 inclusive; received {sample_rate}"
291 ),
292 })
293}
294
295fn should_sample_trace(trace_id: &str, sample_rate: f32) -> bool {
296 if sample_rate >= 1.0 {
297 return true;
298 }
299 if sample_rate <= 0.0 {
300 return false;
301 }
302
303 let mut hash: u64 = 0xcbf29ce484222325;
304 for byte in trace_id.as_bytes() {
305 hash ^= u64::from(*byte);
306 hash = hash.wrapping_mul(0x100000001b3);
307 }
308
309 let ratio = (hash as f64) / (u64::MAX as f64);
310 ratio < (sample_rate as f64)
311}
312
313fn trace_id_from_traceparent(traceparent: &str) -> Option<String> {
314 let mut parts = traceparent.split('-');
315 let version = parts.next()?;
316 let trace_id = parts.next()?;
317 let _span_id = parts.next()?;
318 let _flags = parts.next()?;
319 if parts.next().is_some() {
320 return None;
321 }
322
323 if version.len() != 2 || trace_id.len() != 32 {
324 return None;
325 }
326
327 if !version.chars().all(|ch| ch.is_ascii_hexdigit()) {
328 return None;
329 }
330 if !trace_id.chars().all(|ch| ch.is_ascii_hexdigit()) {
331 return None;
332 }
333 if trace_id.chars().all(|ch| ch == '0') {
334 return None;
335 }
336
337 Some(trace_id.to_ascii_lowercase())
338}
339
340#[derive(Debug, Clone, Copy, PartialEq, Eq)]
341enum TraceIdSource {
342 Disabled,
343 ExplicitTraceId,
344 ParentTraceId,
345 Traceparent,
346 ParentTraceparent,
347 Generated,
348}
349
350#[derive(Debug, Clone)]
351struct ResolvedTelemetryContext {
352 trace_id: Option<String>,
353 sampled: bool,
354 trace_id_source: TraceIdSource,
355}
356
357fn resolve_run_trace_id_with_source(
358 options: &YamlWorkflowRunOptions,
359 parent_trace_context: Option<&TraceContext>,
360) -> (Option<String>, TraceIdSource) {
361 if !options.telemetry.enabled {
362 return (None, TraceIdSource::Disabled);
363 }
364
365 if let Some(trace_id) = options
366 .trace
367 .context
368 .as_ref()
369 .and_then(|context| context.trace_id.clone())
370 {
371 return (Some(trace_id), TraceIdSource::ExplicitTraceId);
372 }
373
374 if let Some(trace_id) = parent_trace_context.and_then(|context| context.trace_id.clone()) {
375 return (Some(trace_id), TraceIdSource::ParentTraceId);
376 }
377
378 if let Some(trace_id) = options
379 .trace
380 .context
381 .as_ref()
382 .and_then(|context| context.traceparent.as_deref())
383 .and_then(trace_id_from_traceparent)
384 {
385 return (Some(trace_id), TraceIdSource::Traceparent);
386 }
387
388 if let Some(trace_id) = parent_trace_context
389 .and_then(|context| context.traceparent.as_deref())
390 .and_then(trace_id_from_traceparent)
391 {
392 return (Some(trace_id), TraceIdSource::ParentTraceparent);
393 }
394
395 (Some(generate_trace_id()), TraceIdSource::Generated)
396}
397
398fn resolve_telemetry_context(
399 options: &YamlWorkflowRunOptions,
400 parent_trace_context: Option<&TraceContext>,
401) -> ResolvedTelemetryContext {
402 let (trace_id, trace_id_source) =
403 resolve_run_trace_id_with_source(options, parent_trace_context);
404 let sampled = trace_id
405 .as_deref()
406 .map(|value| should_sample_trace(value, options.telemetry.sample_rate))
407 .unwrap_or(false);
408
409 ResolvedTelemetryContext {
410 trace_id,
411 sampled,
412 trace_id_source,
413 }
414}
415
416fn generate_trace_id() -> String {
417 use std::time::{SystemTime, UNIX_EPOCH};
418
419 let now_nanos = SystemTime::now()
420 .duration_since(UNIX_EPOCH)
421 .map(|duration| duration.as_nanos())
422 .unwrap_or(0);
423 let sequence = u128::from(TRACE_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed));
424 format!("{:032x}", now_nanos ^ sequence)
425}
426
427fn workflow_metadata_with_trace(
428 options: &YamlWorkflowRunOptions,
429 trace_id: &str,
430 sampled: bool,
431 trace_id_source: TraceIdSource,
432) -> Value {
433 json!({
434 "telemetry": {
435 "trace_id": trace_id,
436 "trace_id_source": match trace_id_source {
437 TraceIdSource::Disabled => "disabled",
438 TraceIdSource::ExplicitTraceId => "explicit_trace_id",
439 TraceIdSource::ParentTraceId => "parent_trace_id",
440 TraceIdSource::Traceparent => "traceparent",
441 TraceIdSource::ParentTraceparent => "parent_traceparent",
442 TraceIdSource::Generated => "generated",
443 },
444 "enabled": options.telemetry.enabled,
445 "sampled": sampled,
446 "nerdstats": options.telemetry.nerdstats,
447 "sample_rate": options.telemetry.sample_rate,
448 "payload_mode": match options.telemetry.payload_mode {
449 YamlWorkflowPayloadMode::FullPayload => "full_payload",
450 YamlWorkflowPayloadMode::RedactedPayload => "redacted_payload",
451 },
452 "retention_days": options.telemetry.retention_days,
453 "multi_tenant": options.telemetry.multi_tenant,
454 "tool_trace_mode": match options.telemetry.tool_trace_mode {
455 YamlToolTraceMode::Full => "full",
456 YamlToolTraceMode::Redacted => "redacted",
457 YamlToolTraceMode::Off => "off",
458 },
459 },
460 "trace": {
461 "tenant": {
462 "workspace_id": options.trace.tenant.workspace_id,
463 "user_id": options.trace.tenant.user_id,
464 "conversation_id": options.trace.tenant.conversation_id,
465 "request_id": options.trace.tenant.request_id,
466 "run_id": options.trace.tenant.run_id,
467 }
468 },
469 })
470}
471
472fn apply_trace_tenant_attributes(span: &mut dyn WorkflowSpan, options: &YamlWorkflowRunOptions) {
473 if let Some(workspace_id) = options.trace.tenant.workspace_id.as_deref() {
474 span.set_attribute("tenant.workspace_id", workspace_id);
475 }
476 if let Some(user_id) = options.trace.tenant.user_id.as_deref() {
477 span.set_attribute("tenant.user_id", user_id);
478 }
479 if let Some(conversation_id) = options.trace.tenant.conversation_id.as_deref() {
480 span.set_attribute("tenant.conversation_id", conversation_id);
481 }
482 if let Some(request_id) = options.trace.tenant.request_id.as_deref() {
483 span.set_attribute("tenant.request_id", request_id);
484 }
485 if let Some(run_id) = options.trace.tenant.run_id.as_deref() {
486 span.set_attribute("tenant.run_id", run_id);
487 }
488}
489
490fn workflow_nerdstats(output: &YamlWorkflowRunOutput) -> Value {
491 let llm_nodes_without_usage: Vec<String> = output
492 .step_timings
493 .iter()
494 .filter(|step| step.node_kind == "llm_call" && step.total_tokens.is_none())
495 .map(|step| step.node_id.clone())
496 .collect();
497 let token_metrics_available = llm_nodes_without_usage.is_empty();
498 let token_metrics_source = if token_metrics_available {
499 "provider_usage"
500 } else {
501 "provider_stream_usage_unavailable"
502 };
503 let total_input_tokens = if token_metrics_available {
504 json!(output.total_input_tokens)
505 } else {
506 Value::Null
507 };
508 let total_output_tokens = if token_metrics_available {
509 json!(output.total_output_tokens)
510 } else {
511 Value::Null
512 };
513 let total_tokens = if token_metrics_available {
514 json!(output.total_tokens)
515 } else {
516 Value::Null
517 };
518 let total_reasoning_tokens = if token_metrics_available {
519 json!(output.total_reasoning_tokens)
520 } else {
521 Value::Null
522 };
523 let tokens_per_second = if token_metrics_available {
524 json!(output.tokens_per_second)
525 } else {
526 Value::Null
527 };
528
529 json!({
530 "workflow_id": output.workflow_id,
531 "terminal_node": output.terminal_node,
532 "total_elapsed_ms": output.total_elapsed_ms,
533 "ttft_ms": output.ttft_ms,
534 "step_details": output.step_timings,
535 "total_input_tokens": total_input_tokens,
536 "total_output_tokens": total_output_tokens,
537 "total_tokens": total_tokens,
538 "total_reasoning_tokens": total_reasoning_tokens,
539 "tokens_per_second": tokens_per_second,
540 "trace_id": output.trace_id,
541 "token_metrics_available": token_metrics_available,
542 "token_metrics_source": token_metrics_source,
543 "llm_nodes_without_usage": llm_nodes_without_usage,
544 })
545}
546
547fn payload_for_span(mode: YamlWorkflowPayloadMode, payload: &Value) -> String {
548 match mode {
549 YamlWorkflowPayloadMode::FullPayload => payload.to_string(),
550 YamlWorkflowPayloadMode::RedactedPayload => json!({
551 "redacted": true,
552 "value_type": match payload {
553 Value::Null => "null",
554 Value::Bool(_) => "bool",
555 Value::Number(_) => "number",
556 Value::String(_) => "string",
557 Value::Array(_) => "array",
558 Value::Object(_) => "object",
559 }
560 })
561 .to_string(),
562 }
563}
564
565fn payload_for_tool_trace(mode: YamlToolTraceMode, payload: &Value) -> Value {
566 match mode {
567 YamlToolTraceMode::Full => payload.clone(),
568 YamlToolTraceMode::Redacted => json!({
569 "redacted": true,
570 "value_type": json_type_name(payload),
571 }),
572 YamlToolTraceMode::Off => Value::Null,
573 }
574}
575
576fn validate_json_schema(schema: &Value) -> Result<(), String> {
577 jsonschema::JSONSchema::compile(schema)
578 .map(|_| ())
579 .map_err(|error| format!("invalid JSON schema: {error}"))
580}
581
582fn validate_schema_instance(schema: &Value, instance: &Value) -> Result<(), String> {
583 let validator = jsonschema::JSONSchema::compile(schema)
584 .map_err(|error| format!("invalid JSON schema: {error}"))?;
585 if let Err(errors) = validator.validate(instance) {
586 let message = errors
587 .into_iter()
588 .next()
589 .map(|error| error.to_string())
590 .unwrap_or_else(|| "unknown schema validation error".to_string());
591 return Err(format!("schema validation failed: {message}"));
592 }
593 Ok(())
594}
595
596fn schema_type(schema: &Value) -> Option<&str> {
597 schema.get("type").and_then(Value::as_str)
598}
599
600fn schema_expects_object(schema: &Value) -> bool {
601 schema_type(schema) == Some("object")
602}
603
604fn trace_context_from_options(options: &YamlWorkflowRunOptions) -> Option<TraceContext> {
605 options.trace.context.as_ref().map(|input| TraceContext {
606 trace_id: input.trace_id.clone(),
607 span_id: input.span_id.clone(),
608 parent_span_id: input.parent_span_id.clone(),
609 traceparent: input.traceparent.clone(),
610 tracestate: input.tracestate.clone(),
611 baggage: input.baggage.clone(),
612 })
613}
614
615fn merged_trace_context_for_worker(
616 span_context: Option<&TraceContext>,
617 resolved_trace_id: Option<&str>,
618 options: &YamlWorkflowRunOptions,
619) -> TraceContext {
620 let input_context = options.trace.context.as_ref();
621 let baggage = if let Some(context) = span_context {
622 if !context.baggage.is_empty() {
623 context.baggage.clone()
624 } else {
625 input_context
626 .map(|value| value.baggage.clone())
627 .unwrap_or_default()
628 }
629 } else {
630 input_context
631 .map(|value| value.baggage.clone())
632 .unwrap_or_default()
633 };
634
635 TraceContext {
636 trace_id: span_context
637 .and_then(|context| context.trace_id.clone())
638 .or_else(|| resolved_trace_id.map(|value| value.to_string()))
639 .or_else(|| input_context.and_then(|context| context.trace_id.clone())),
640 span_id: span_context
641 .and_then(|context| context.span_id.clone())
642 .or_else(|| input_context.and_then(|context| context.span_id.clone())),
643 parent_span_id: span_context
644 .and_then(|context| context.parent_span_id.clone())
645 .or_else(|| input_context.and_then(|context| context.parent_span_id.clone())),
646 traceparent: span_context
647 .and_then(|context| context.traceparent.clone())
648 .or_else(|| input_context.and_then(|context| context.traceparent.clone())),
649 tracestate: span_context
650 .and_then(|context| context.tracestate.clone())
651 .or_else(|| input_context.and_then(|context| context.tracestate.clone())),
652 baggage,
653 }
654}
655
656fn custom_worker_context_with_trace(
657 context: &Value,
658 trace_context: &TraceContext,
659 tenant_context: &YamlWorkflowTraceTenantContext,
660) -> Value {
661 let mut context_with_trace = context.clone();
662 let Some(root) = context_with_trace.as_object_mut() else {
663 return context_with_trace;
664 };
665
666 root.insert(
667 "trace".to_string(),
668 json!({
669 "context": {
670 "trace_id": trace_context.trace_id,
671 "span_id": trace_context.span_id,
672 "parent_span_id": trace_context.parent_span_id,
673 "traceparent": trace_context.traceparent,
674 "tracestate": trace_context.tracestate,
675 "baggage": trace_context.baggage,
676 },
677 "tenant": {
678 "workspace_id": tenant_context.workspace_id,
679 "user_id": tenant_context.user_id,
680 "conversation_id": tenant_context.conversation_id,
681 "request_id": tenant_context.request_id,
682 "run_id": tenant_context.run_id,
683 }
684 }),
685 );
686
687 context_with_trace
688}
689
690fn include_raw_stream_debug_events() -> bool {
691 match std::env::var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW") {
692 Ok(value) => {
693 let normalized = value.trim().to_ascii_lowercase();
694 normalized == "1" || normalized == "true" || normalized == "yes" || normalized == "on"
695 }
696 Err(_) => false,
697 }
698}
699
700#[derive(Debug)]
701struct StreamedPayloadResolution {
702 payload: Value,
703 heal_confidence: Option<f32>,
704}
705
706#[derive(Debug, Default)]
707struct StreamJsonAsTextFormatter {
708 raw_json: String,
709 emitted: bool,
710}
711
712impl StreamJsonAsTextFormatter {
713 fn push(&mut self, chunk: &str) {
714 self.raw_json.push_str(chunk);
715 }
716
717 fn emit_if_ready(&mut self, complete: bool) -> Option<String> {
718 if self.emitted || !complete {
719 return None;
720 }
721 self.emitted = true;
722 Some(render_json_object_as_text(self.raw_json.as_str()))
723 }
724}
725
726fn render_json_object_as_text(raw_json: &str) -> String {
727 let value = match serde_json::from_str::<Value>(raw_json) {
728 Ok(value) => value,
729 Err(_) => return raw_json.to_string(),
730 };
731 let Some(object) = value.as_object() else {
732 return raw_json.to_string();
733 };
734
735 let mut lines = Vec::with_capacity(object.len());
736 for (key, value) in object {
737 let rendered = match value {
738 Value::String(text) => text.clone(),
739 _ => value.to_string(),
740 };
741 lines.push(format!("{key}: {rendered}"));
742 }
743 lines.join("\n")
744}
745
746#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
747#[serde(rename_all = "snake_case")]
748pub enum YamlWorkflowTokenKind {
749 Output,
750 Thinking,
751}
752
753#[derive(Debug, Default)]
754struct StructuredJsonDeltaFilter {
755 started: bool,
756 completed: bool,
757 depth: u32,
758 in_string: bool,
759 escape: bool,
760}
761
762impl StructuredJsonDeltaFilter {
763 fn split(&mut self, delta: &str) -> (Option<String>, Option<String>) {
764 if delta.is_empty() {
765 return (None, None);
766 }
767
768 let mut output = String::new();
769 let mut thinking = String::new();
770
771 for ch in delta.chars() {
772 if self.completed {
773 thinking.push(ch);
774 continue;
775 }
776
777 if !self.started {
778 if ch != '{' {
779 thinking.push(ch);
780 continue;
781 }
782 self.started = true;
783 self.depth = 1;
784 output.push(ch);
785 continue;
786 }
787
788 output.push(ch);
789 if self.in_string {
790 if self.escape {
791 self.escape = false;
792 continue;
793 }
794 if ch == '\\' {
795 self.escape = true;
796 continue;
797 }
798 if ch == '"' {
799 self.in_string = false;
800 }
801 continue;
802 }
803
804 match ch {
805 '"' => self.in_string = true,
806 '{' => self.depth = self.depth.saturating_add(1),
807 '}' => {
808 if self.depth > 0 {
809 self.depth -= 1;
810 }
811 if self.depth == 0 {
812 self.completed = true;
813 }
814 }
815 _ => {}
816 }
817 }
818
819 let output = if output.is_empty() {
820 None
821 } else {
822 Some(output)
823 };
824 let thinking = if thinking.is_empty() {
825 None
826 } else {
827 Some(thinking)
828 };
829
830 (output, thinking)
831 }
832
833 fn completed(&self) -> bool {
834 self.completed
835 }
836}
837
838fn extract_last_fenced_json_block(raw: &str) -> Option<&str> {
839 let start = raw.rfind("```json")?;
840 let remainder = &raw[start + "```json".len()..];
841 let end = remainder.find("```")?;
842 let candidate = remainder[..end].trim();
843 if candidate.is_empty() {
844 return None;
845 }
846 Some(candidate)
847}
848
849fn extract_balanced_object_from(raw: &str, start_index: usize) -> Option<&str> {
850 let mut depth = 0u32;
851 let mut in_string = false;
852 let mut escape = false;
853
854 for (relative_index, ch) in raw[start_index..].char_indices() {
855 if in_string {
856 if escape {
857 escape = false;
858 continue;
859 }
860 if ch == '\\' {
861 escape = true;
862 continue;
863 }
864 if ch == '"' {
865 in_string = false;
866 }
867 continue;
868 }
869
870 match ch {
871 '"' => in_string = true,
872 '{' => depth = depth.saturating_add(1),
873 '}' => {
874 if depth == 0 {
875 return None;
876 }
877 depth -= 1;
878 if depth == 0 {
879 let end_index = start_index + relative_index + ch.len_utf8();
880 return Some(raw[start_index..end_index].trim());
881 }
882 }
883 _ => {}
884 }
885 }
886
887 None
888}
889
890fn extract_last_parsable_object(raw: &str) -> Option<&str> {
891 let starts: Vec<usize> = raw
892 .char_indices()
893 .filter_map(|(index, ch)| if ch == '{' { Some(index) } else { None })
894 .collect();
895
896 for start in starts.into_iter().rev() {
897 let Some(candidate) = extract_balanced_object_from(raw, start) else {
898 continue;
899 };
900 if serde_json::from_str::<Value>(candidate).is_ok() {
901 return Some(candidate);
902 }
903 }
904
905 None
906}
907
908fn resolve_structured_json_candidate(raw: &str) -> Option<&str> {
909 extract_last_fenced_json_block(raw).or_else(|| extract_last_parsable_object(raw))
910}
911
912fn parse_streamed_structured_payload(
913 raw: &str,
914 heal: bool,
915) -> Result<StreamedPayloadResolution, String> {
916 if !heal {
917 if let Ok(payload) = serde_json::from_str::<Value>(raw) {
918 return Ok(StreamedPayloadResolution {
919 payload,
920 heal_confidence: None,
921 });
922 }
923
924 let candidate = resolve_structured_json_candidate(raw).ok_or_else(|| {
925 "failed to parse streamed structured completion JSON: no JSON object candidate found"
926 .to_string()
927 })?;
928 let payload = serde_json::from_str::<Value>(candidate).map_err(|error| {
929 format!(
930 "failed to parse streamed structured completion JSON: {error}; candidate={candidate}"
931 )
932 })?;
933 return Ok(StreamedPayloadResolution {
934 payload,
935 heal_confidence: None,
936 });
937 }
938
939 let candidate = resolve_structured_json_candidate(raw).unwrap_or(raw);
940 let parser = JsonishParser::new();
941 let healed = parser
942 .parse(candidate)
943 .map_err(|error| format!("failed to heal streamed structured completion JSON: {error}"))?;
944
945 Ok(StreamedPayloadResolution {
946 payload: healed.value,
947 heal_confidence: Some(healed.confidence),
948 })
949}
950
951#[derive(Debug, Clone, PartialEq, Serialize)]
952pub struct YamlWorkflowEvent {
953 pub event_type: String,
954 pub node_id: Option<String>,
955 pub step_id: Option<String>,
956 pub node_kind: Option<String>,
957 pub streamable: Option<bool>,
958 pub message: Option<String>,
959 pub delta: Option<String>,
960 pub token_kind: Option<YamlWorkflowTokenKind>,
961 pub is_terminal_node_token: Option<bool>,
962 pub elapsed_ms: Option<u128>,
963 pub metadata: Option<Value>,
964}
965
966pub type WorkflowMessageRole = Role;
967
968#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
969pub struct WorkflowMessage {
970 pub role: WorkflowMessageRole,
971 pub content: String,
972 #[serde(default)]
973 pub name: Option<String>,
974 #[serde(default, alias = "toolCallId")]
975 pub tool_call_id: Option<String>,
976}
977
978#[derive(Debug, Clone, PartialEq, Serialize)]
979pub struct YamlTemplateBinding {
980 pub index: usize,
981 pub expression: String,
982 pub source_path: String,
983 pub resolved: Value,
984 pub resolved_type: String,
985 pub missing: bool,
986}
987
988#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
989pub enum YamlWorkflowDiagnosticSeverity {
990 Error,
991 Warning,
992}
993
994#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
995pub struct YamlWorkflowDiagnostic {
996 pub node_id: Option<String>,
997 pub code: String,
998 pub severity: YamlWorkflowDiagnosticSeverity,
999 pub message: String,
1000}
1001
1002#[derive(Debug, Error)]
1003pub enum YamlWorkflowRunError {
1004 #[error("failed to read workflow yaml '{path}': {source}")]
1005 Read {
1006 path: String,
1007 source: std::io::Error,
1008 },
1009 #[error("failed to parse workflow yaml '{path}': {source}")]
1010 Parse {
1011 path: String,
1012 source: serde_yaml::Error,
1013 },
1014 #[error("workflow '{workflow_id}' has no nodes")]
1015 EmptyNodes { workflow_id: String },
1016 #[error("entry node '{entry_node}' does not exist")]
1017 MissingEntry { entry_node: String },
1018 #[error("unknown node id '{node_id}'")]
1019 MissingNode { node_id: String },
1020 #[error("unsupported node type in '{node_id}'")]
1021 UnsupportedNodeType { node_id: String },
1022 #[error("unsupported switch condition format: {condition}")]
1023 UnsupportedCondition { condition: String },
1024 #[error("switch node '{node_id}' has no valid next target")]
1025 InvalidSwitchTarget { node_id: String },
1026 #[error("llm returned non-object payload for node '{node_id}'")]
1027 LlmPayloadNotObject { node_id: String },
1028 #[error("custom worker handler '{handler}' is not supported")]
1029 UnsupportedCustomHandler { handler: String },
1030 #[error("llm execution failed for node '{node_id}': {message}")]
1031 Llm { node_id: String, message: String },
1032 #[error("custom worker execution failed for node '{node_id}': {message}")]
1033 CustomWorker { node_id: String, message: String },
1034 #[error("workflow validation failed with {diagnostics_count} error(s)")]
1035 Validation {
1036 diagnostics_count: usize,
1037 diagnostics: Vec<YamlWorkflowDiagnostic>,
1038 },
1039 #[error("invalid workflow input: {message}")]
1040 InvalidInput { message: String },
1041 #[error("ir runtime execution failed: {message}")]
1042 IrRuntime { message: String },
1043 #[error("workflow event stream cancelled: {message}")]
1044 EventSinkCancelled { message: String },
1045}
1046
1047pub trait YamlWorkflowEventSink: Send + Sync {
1048 fn emit(&self, event: &YamlWorkflowEvent);
1049
1050 fn is_cancelled(&self) -> bool {
1051 false
1052 }
1053}
1054
1055pub struct NoopYamlWorkflowEventSink;
1056
1057impl YamlWorkflowEventSink for NoopYamlWorkflowEventSink {
1058 fn emit(&self, _event: &YamlWorkflowEvent) {}
1059}
1060
1061fn workflow_event_sink_cancelled_message() -> &'static str {
1062 "workflow event callback cancelled"
1063}
1064
1065fn event_sink_is_cancelled(event_sink: Option<&dyn YamlWorkflowEventSink>) -> bool {
1066 event_sink.map(|sink| sink.is_cancelled()).unwrap_or(false)
1067}
1068
1069#[derive(Debug, Clone, PartialEq, Eq, Error)]
1070pub enum YamlToIrError {
1071 #[error("entry node '{entry_node}' does not exist")]
1072 MissingEntry { entry_node: String },
1073 #[error("node '{node_id}' has multiple outgoing edges in YAML; IR llm/tool nodes require one")]
1074 MultipleOutgoingEdge { node_id: String },
1075 #[error("node '{node_id}' is unsupported for IR conversion: {reason}")]
1076 UnsupportedNode { node_id: String, reason: String },
1077}
1078
1079pub fn yaml_workflow_to_mermaid(workflow: &YamlWorkflow) -> String {
1081 if let Ok(ir) = yaml_workflow_to_ir(workflow) {
1082 return workflow_to_mermaid(&ir);
1083 }
1084
1085 yaml_workflow_to_mermaid_fallback(workflow)
1086}
1087
1088fn yaml_workflow_to_mermaid_fallback(workflow: &YamlWorkflow) -> String {
1089 let mut lines = Vec::new();
1090 lines.push("flowchart TD".to_string());
1091 let mut tool_node_ids: Vec<String> = Vec::new();
1092
1093 for node in &workflow.nodes {
1094 let label = format!("{}\\n({})", node.id, node.kind_name());
1095 lines.push(format!(
1096 " {}[\"{}\"]",
1097 sanitize_mermaid_id(&node.id),
1098 escape_mermaid_label(label.as_str()),
1099 ));
1100
1101 if let Some(llm) = node.node_type.llm_call.as_ref() {
1102 for (idx, tool_name) in llm_tool_names(llm).into_iter().enumerate() {
1103 let tool_id = sanitize_mermaid_id(format!("{}__tool_{}", node.id, idx).as_str());
1104 lines.push(format!(
1105 " {}([\"{}\"])",
1106 tool_id,
1107 escape_mermaid_label(format!("tool: {tool_name}").as_str())
1108 ));
1109 lines.push(format!(
1110 " {} -.-> {}",
1111 sanitize_mermaid_id(&node.id),
1112 tool_id
1113 ));
1114 tool_node_ids.push(tool_id);
1115 }
1116 }
1117 }
1118
1119 let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1120
1121 for edge in &workflow.edges {
1122 emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1123 }
1124
1125 for node in &workflow.nodes {
1126 if let Some(switch) = node.node_type.switch.as_ref() {
1127 for branch in &switch.branches {
1128 emitted.insert((
1129 node.id.clone(),
1130 branch.condition.clone(),
1131 branch.target.clone(),
1132 ));
1133 }
1134 emitted.insert((
1135 node.id.clone(),
1136 "default".to_string(),
1137 switch.default.clone(),
1138 ));
1139 }
1140 }
1141
1142 let mut edges = emitted.into_iter().collect::<Vec<_>>();
1143 edges.sort();
1144
1145 for (from, label, to) in edges {
1146 if label.is_empty() {
1147 lines.push(format!(
1148 " {} --> {}",
1149 sanitize_mermaid_id(&from),
1150 sanitize_mermaid_id(&to)
1151 ));
1152 } else {
1153 lines.push(format!(
1154 " {} -- \"{}\" --> {}",
1155 sanitize_mermaid_id(&from),
1156 escape_mermaid_label(&label),
1157 sanitize_mermaid_id(&to)
1158 ));
1159 }
1160 }
1161
1162 if !tool_node_ids.is_empty() {
1163 lines.push(" classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1164 lines.push(format!(" class {} toolNode;", tool_node_ids.join(",")));
1165 }
1166
1167 lines.join("\n")
1168}
1169
1170fn llm_tool_names(llm: &YamlLlmCall) -> Vec<String> {
1171 llm.tools
1172 .iter()
1173 .map(|tool| match tool {
1174 YamlToolDeclaration::OpenAi(openai) => openai.function.name.clone(),
1175 YamlToolDeclaration::Simplified(simple) => simple.name.clone(),
1176 })
1177 .collect()
1178}
1179
1180pub fn yaml_workflow_file_to_mermaid(workflow_path: &Path) -> Result<String, YamlWorkflowRunError> {
1182 let contents =
1183 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1184 path: workflow_path.display().to_string(),
1185 source,
1186 })?;
1187
1188 let workflow: YamlWorkflow =
1189 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1190 path: workflow_path.display().to_string(),
1191 source,
1192 })?;
1193
1194 let referenced_subgraphs = discover_referenced_subgraphs(workflow_path, &workflow)?;
1195 if referenced_subgraphs.is_empty() {
1196 return Ok(yaml_workflow_to_mermaid(&workflow));
1197 }
1198
1199 Ok(yaml_workflow_to_mermaid_with_subgraphs(
1200 &workflow,
1201 &referenced_subgraphs,
1202 ))
1203}
1204
1205#[derive(Debug, Clone)]
1206struct MermaidSubgraphWorkflow {
1207 alias: String,
1208 workflow: YamlWorkflow,
1209}
1210
1211#[derive(Debug, Default)]
1212struct MermaidBlockRender {
1213 lines: Vec<String>,
1214 tool_node_ids: Vec<String>,
1215 run_workflow_tool_node_ids: Vec<String>,
1216 entry_node_id: String,
1217}
1218
1219fn yaml_workflow_to_mermaid_with_subgraphs(
1220 workflow: &YamlWorkflow,
1221 subgraphs: &[MermaidSubgraphWorkflow],
1222) -> String {
1223 let mut lines = vec!["flowchart TD".to_string()];
1224
1225 let main_block = render_mermaid_block(workflow, "main");
1226 lines.push(format!(
1227 " subgraph main_graph[\"Main: {}\"]",
1228 escape_mermaid_label(&workflow.id)
1229 ));
1230 for line in &main_block.lines {
1231 lines.push(format!(" {line}"));
1232 }
1233 lines.push(" end".to_string());
1234
1235 let mut all_tool_nodes = main_block.tool_node_ids.clone();
1236
1237 for (index, subgraph) in subgraphs.iter().enumerate() {
1238 let block_id = format!("subgraph_{}", index + 1);
1239 let block = render_mermaid_block(&subgraph.workflow, &block_id);
1240 lines.push(format!(
1241 " subgraph {}[\"Subgraph: {}\"]",
1242 sanitize_mermaid_id(&format!("{}_cluster", block_id)),
1243 escape_mermaid_label(&subgraph.alias)
1244 ));
1245 for line in &block.lines {
1246 lines.push(format!(" {line}"));
1247 }
1248 lines.push(" end".to_string());
1249
1250 for tool_node in &main_block.run_workflow_tool_node_ids {
1251 lines.push(format!(
1252 " {} -. \"{}\" .-> {}",
1253 tool_node,
1254 escape_mermaid_label(&format!("calls {}", subgraph.alias)),
1255 block.entry_node_id
1256 ));
1257 }
1258
1259 all_tool_nodes.extend(block.tool_node_ids);
1260 }
1261
1262 if !all_tool_nodes.is_empty() {
1263 lines.push(" classDef toolNode fill:#FFF4D6,stroke:#D97706,color:#7C2D12;".to_string());
1264 lines.push(format!(" class {} toolNode;", all_tool_nodes.join(",")));
1265 }
1266
1267 lines.join("\n")
1268}
1269
1270fn render_mermaid_block(workflow: &YamlWorkflow, prefix: &str) -> MermaidBlockRender {
1271 let mut block = MermaidBlockRender {
1272 entry_node_id: prefixed_mermaid_id(prefix, &workflow.entry_node),
1273 ..Default::default()
1274 };
1275
1276 for node in &workflow.nodes {
1277 let node_id = prefixed_mermaid_id(prefix, &node.id);
1278 let label = format!("{}\\n({})", node.id, node.kind_name());
1279 block
1280 .lines
1281 .push(format!("{}[\"{}\"]", node_id, escape_mermaid_label(&label)));
1282
1283 if let Some(llm) = node.node_type.llm_call.as_ref() {
1284 for (idx, tool) in llm.tools.iter().enumerate() {
1285 let tool_name = tool_declaration_name(tool);
1286 let tool_id = prefixed_mermaid_id(prefix, &format!("{}__tool_{}", node.id, idx));
1287 block.lines.push(format!(
1288 "{}([\"{}\"])",
1289 tool_id,
1290 escape_mermaid_label(format!("tool: {tool_name}").as_str())
1291 ));
1292 block.lines.push(format!("{} -.-> {}", node_id, tool_id));
1293 block.tool_node_ids.push(tool_id.clone());
1294
1295 if tool_name == "run_workflow_graph" {
1296 block.run_workflow_tool_node_ids.push(tool_id);
1297 }
1298 }
1299 }
1300 }
1301
1302 let mut emitted: HashSet<(String, String, String)> = HashSet::new();
1303
1304 for edge in &workflow.edges {
1305 emitted.insert((edge.from.clone(), String::new(), edge.to.clone()));
1306 }
1307
1308 for node in &workflow.nodes {
1309 if let Some(switch) = node.node_type.switch.as_ref() {
1310 for branch in &switch.branches {
1311 emitted.insert((
1312 node.id.clone(),
1313 branch.condition.clone(),
1314 branch.target.clone(),
1315 ));
1316 }
1317 emitted.insert((
1318 node.id.clone(),
1319 "default".to_string(),
1320 switch.default.clone(),
1321 ));
1322 }
1323 }
1324
1325 let mut edges = emitted.into_iter().collect::<Vec<_>>();
1326 edges.sort();
1327
1328 for (from, label, to) in edges {
1329 let from_id = prefixed_mermaid_id(prefix, &from);
1330 let to_id = prefixed_mermaid_id(prefix, &to);
1331 if label.is_empty() {
1332 block.lines.push(format!("{} --> {}", from_id, to_id));
1333 } else {
1334 block.lines.push(format!(
1335 "{} -- \"{}\" --> {}",
1336 from_id,
1337 escape_mermaid_label(&label),
1338 to_id
1339 ));
1340 }
1341 }
1342
1343 block
1344}
1345
1346fn discover_referenced_subgraphs(
1347 workflow_path: &Path,
1348 workflow: &YamlWorkflow,
1349) -> Result<Vec<MermaidSubgraphWorkflow>, YamlWorkflowRunError> {
1350 let workflow_ids = referenced_workflow_ids(workflow);
1351 if workflow_ids.is_empty() {
1352 return Ok(Vec::new());
1353 }
1354
1355 let parent_dir = workflow_path.parent().unwrap_or(Path::new("."));
1356 let sibling_workflows = load_yaml_sibling_workflows(parent_dir, workflow_path)?;
1357
1358 let mut discovered = Vec::new();
1359 let mut seen = HashSet::new();
1360
1361 for workflow_id in workflow_ids {
1362 let normalized = normalize_workflow_lookup_key(&workflow_id);
1363 if seen.contains(&normalized) {
1364 continue;
1365 }
1366
1367 if let Some((_, subworkflow)) = sibling_workflows.iter().find(|(key, _)| key == &normalized)
1368 {
1369 discovered.push(MermaidSubgraphWorkflow {
1370 alias: workflow_id.clone(),
1371 workflow: subworkflow.clone(),
1372 });
1373 seen.insert(normalized);
1374 }
1375 }
1376
1377 Ok(discovered)
1378}
1379
1380fn load_yaml_sibling_workflows(
1381 parent_dir: &Path,
1382 workflow_path: &Path,
1383) -> Result<Vec<(String, YamlWorkflow)>, YamlWorkflowRunError> {
1384 let mut results = Vec::new();
1385 let entries = std::fs::read_dir(parent_dir).map_err(|source| YamlWorkflowRunError::Read {
1386 path: parent_dir.display().to_string(),
1387 source,
1388 })?;
1389
1390 for entry in entries {
1391 let entry = entry.map_err(|source| YamlWorkflowRunError::Read {
1392 path: parent_dir.display().to_string(),
1393 source,
1394 })?;
1395 let path = entry.path();
1396 if !is_yaml_file(&path) {
1397 continue;
1398 }
1399
1400 if path == workflow_path {
1401 continue;
1402 }
1403
1404 let contents =
1405 std::fs::read_to_string(&path).map_err(|source| YamlWorkflowRunError::Read {
1406 path: path.display().to_string(),
1407 source,
1408 })?;
1409 let subworkflow: YamlWorkflow =
1410 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1411 path: path.display().to_string(),
1412 source,
1413 })?;
1414
1415 if let Some(stem) = path.file_stem().and_then(|value| value.to_str()) {
1416 results.push((normalize_workflow_lookup_key(stem), subworkflow.clone()));
1417 }
1418 results.push((normalize_workflow_lookup_key(&subworkflow.id), subworkflow));
1419 }
1420
1421 Ok(results)
1422}
1423
1424fn referenced_workflow_ids(workflow: &YamlWorkflow) -> Vec<String> {
1425 let mut ids = Vec::new();
1426 let mut seen = HashSet::new();
1427
1428 for node in &workflow.nodes {
1429 let prompt = node
1430 .config
1431 .as_ref()
1432 .and_then(|config| config.prompt.as_deref());
1433
1434 if let Some(llm) = node.node_type.llm_call.as_ref() {
1435 for tool in &llm.tools {
1436 if tool_declaration_name(tool) != "run_workflow_graph" {
1437 continue;
1438 }
1439
1440 for workflow_id in referenced_workflow_ids_from_tool(tool) {
1441 let normalized = normalize_workflow_lookup_key(&workflow_id);
1442 if seen.insert(normalized) {
1443 ids.push(workflow_id);
1444 }
1445 }
1446
1447 if let Some(prompt_text) = prompt {
1448 for workflow_id in referenced_workflow_ids_from_prompt(prompt_text) {
1449 let normalized = normalize_workflow_lookup_key(&workflow_id);
1450 if seen.insert(normalized) {
1451 ids.push(workflow_id);
1452 }
1453 }
1454 }
1455 }
1456 }
1457 }
1458
1459 ids
1460}
1461
1462fn referenced_workflow_ids_from_tool(tool: &YamlToolDeclaration) -> Vec<String> {
1463 let schema = match tool {
1464 YamlToolDeclaration::OpenAi(openai) => openai.function.parameters.as_ref(),
1465 YamlToolDeclaration::Simplified(simple) => Some(&simple.input_schema),
1466 };
1467
1468 let mut ids = Vec::new();
1469 if let Some(schema) = schema {
1470 if let Some(workflow_prop) = schema
1471 .get("properties")
1472 .and_then(Value::as_object)
1473 .and_then(|properties| properties.get("workflow_id"))
1474 {
1475 if let Some(value) = workflow_prop.get("const").and_then(Value::as_str) {
1476 ids.push(value.to_string());
1477 }
1478
1479 if let Some(enum_values) = workflow_prop.get("enum").and_then(Value::as_array) {
1480 for value in enum_values {
1481 if let Some(value) = value.as_str() {
1482 ids.push(value.to_string());
1483 }
1484 }
1485 }
1486 }
1487 }
1488
1489 ids
1490}
1491
1492fn referenced_workflow_ids_from_prompt(prompt: &str) -> Vec<String> {
1493 let mut ids = Vec::new();
1494 let mut search = prompt;
1495
1496 while let Some(index) = search.find("\"workflow_id\"") {
1497 let remainder = &search[index + "\"workflow_id\"".len()..];
1498 let Some(colon_index) = remainder.find(':') else {
1499 break;
1500 };
1501
1502 let candidate = remainder[colon_index + 1..].trim_start();
1503 if let Some(rest) = candidate.strip_prefix('"') {
1504 if let Some(end_quote_index) = rest.find('"') {
1505 let workflow_id = rest[..end_quote_index].trim();
1506 if !workflow_id.is_empty() {
1507 ids.push(workflow_id.to_string());
1508 }
1509 search = &rest[end_quote_index + 1..];
1510 continue;
1511 }
1512 }
1513
1514 search = &remainder[colon_index + 1..];
1515 }
1516
1517 ids
1518}
1519
1520fn normalize_workflow_lookup_key(value: &str) -> String {
1521 value
1522 .chars()
1523 .map(|ch| match ch {
1524 '-' => '_',
1525 _ => ch.to_ascii_lowercase(),
1526 })
1527 .collect()
1528}
1529
1530fn is_yaml_file(path: &Path) -> bool {
1531 matches!(
1532 path.extension().and_then(|ext| ext.to_str()),
1533 Some("yaml") | Some("yml")
1534 )
1535}
1536
1537fn prefixed_mermaid_id(prefix: &str, id: &str) -> String {
1538 sanitize_mermaid_id(format!("{}__{}", prefix, id).as_str())
1539}
1540
1541fn tool_declaration_name(tool: &YamlToolDeclaration) -> &str {
1542 match tool {
1543 YamlToolDeclaration::OpenAi(openai) => &openai.function.name,
1544 YamlToolDeclaration::Simplified(simple) => &simple.name,
1545 }
1546}
1547
1548pub fn yaml_workflow_to_ir(workflow: &YamlWorkflow) -> Result<WorkflowDefinition, YamlToIrError> {
1549 let known_ids: HashSet<&str> = workflow.nodes.iter().map(|n| n.id.as_str()).collect();
1550 if !known_ids.contains(workflow.entry_node.as_str()) {
1551 return Err(YamlToIrError::MissingEntry {
1552 entry_node: workflow.entry_node.clone(),
1553 });
1554 }
1555
1556 let mut outgoing: HashMap<&str, Vec<&str>> = HashMap::new();
1557 for edge in &workflow.edges {
1558 outgoing
1559 .entry(edge.from.as_str())
1560 .or_default()
1561 .push(edge.to.as_str());
1562 }
1563
1564 let mut nodes = Vec::with_capacity(workflow.nodes.len() + 1);
1565 nodes.push(Node {
1566 id: YAML_START_NODE_ID.to_string(),
1567 kind: NodeKind::Start {
1568 next: workflow.entry_node.clone(),
1569 },
1570 });
1571
1572 for node in &workflow.nodes {
1573 if let Some(llm) = node.node_type.llm_call.as_ref() {
1574 if node
1575 .config
1576 .as_ref()
1577 .and_then(|c| c.set_globals.as_ref())
1578 .is_some()
1579 || node
1580 .config
1581 .as_ref()
1582 .and_then(|c| c.update_globals.as_ref())
1583 .is_some()
1584 {
1585 return Err(YamlToIrError::UnsupportedNode {
1586 node_id: node.id.clone(),
1587 reason: "set_globals/update_globals are not represented in canonical IR llm nodes yet"
1588 .to_string(),
1589 });
1590 }
1591
1592 if !llm.tools.is_empty() {
1593 return Err(YamlToIrError::UnsupportedNode {
1594 node_id: node.id.clone(),
1595 reason: "llm_call.tools are not represented in canonical IR llm nodes yet"
1596 .to_string(),
1597 });
1598 }
1599
1600 let next = single_next_for_node(&outgoing, &node.id)?;
1601 nodes.push(Node {
1602 id: node.id.clone(),
1603 kind: NodeKind::Tool {
1604 tool: YAML_LLM_TOOL_ID.to_string(),
1605 input: json!({
1606 "node_id": node.id,
1607 "model": llm.model,
1608 "prompt_template": node
1609 .config
1610 .as_ref()
1611 .and_then(|c| c.prompt.clone())
1612 .unwrap_or_default(),
1613 "stream": llm.stream.unwrap_or(false),
1614 "stream_json_as_text": llm.stream_json_as_text.unwrap_or(false),
1615 "heal": llm.heal.unwrap_or(false),
1616 "messages_path": llm.messages_path,
1617 "append_prompt_as_user": llm.append_prompt_as_user.unwrap_or(true),
1618 "output_schema": node
1619 .config
1620 .as_ref()
1621 .and_then(|c| c.output_schema.clone())
1622 .unwrap_or_else(default_llm_output_schema),
1623 }),
1624 next,
1625 },
1626 });
1627 continue;
1628 }
1629
1630 if let Some(worker) = node.node_type.custom_worker.as_ref() {
1631 if node
1632 .config
1633 .as_ref()
1634 .and_then(|c| c.set_globals.as_ref())
1635 .is_some()
1636 || node
1637 .config
1638 .as_ref()
1639 .and_then(|c| c.update_globals.as_ref())
1640 .is_some()
1641 {
1642 return Err(YamlToIrError::UnsupportedNode {
1643 node_id: node.id.clone(),
1644 reason: "set_globals/update_globals are not represented in canonical IR tool nodes yet"
1645 .to_string(),
1646 });
1647 }
1648
1649 let next = single_next_for_node(&outgoing, &node.id)?;
1650 nodes.push(Node {
1651 id: node.id.clone(),
1652 kind: NodeKind::Tool {
1653 tool: worker.handler.clone(),
1654 input: node
1655 .config
1656 .as_ref()
1657 .and_then(|c| c.payload.clone())
1658 .unwrap_or_else(|| json!({})),
1659 next,
1660 },
1661 });
1662 continue;
1663 }
1664
1665 if let Some(switch) = node.node_type.switch.as_ref() {
1666 nodes.push(Node {
1667 id: node.id.clone(),
1668 kind: NodeKind::Router {
1669 routes: switch
1670 .branches
1671 .iter()
1672 .map(|b| RouterRoute {
1673 when: rewrite_yaml_condition_to_ir(&b.condition),
1674 next: b.target.clone(),
1675 })
1676 .collect(),
1677 default: switch.default.clone(),
1678 },
1679 });
1680 continue;
1681 }
1682
1683 return Err(YamlToIrError::UnsupportedNode {
1684 node_id: node.id.clone(),
1685 reason: "node_type must be llm_call, switch, or custom_worker".to_string(),
1686 });
1687 }
1688
1689 Ok(WorkflowDefinition {
1690 version: WORKFLOW_IR_V0.to_string(),
1691 name: workflow.id.clone(),
1692 nodes,
1693 })
1694}
1695
1696fn single_next_for_node(
1697 outgoing: &HashMap<&str, Vec<&str>>,
1698 node_id: &str,
1699) -> Result<Option<String>, YamlToIrError> {
1700 match outgoing.get(node_id) {
1701 None => Ok(None),
1702 Some(targets) if targets.len() == 1 => Ok(Some(targets[0].to_string())),
1703 Some(_) => Err(YamlToIrError::MultipleOutgoingEdge {
1704 node_id: node_id.to_string(),
1705 }),
1706 }
1707}
1708
1709fn rewrite_yaml_condition_to_ir(expr: &str) -> String {
1710 let rewritten = expr
1711 .replace("$.nodes.", "$.node_outputs.")
1712 .replace(".output.", ".");
1713 if let Some(prefix) = rewritten.strip_suffix(".output") {
1714 prefix.to_string()
1715 } else {
1716 rewritten
1717 }
1718}
1719
1720fn sanitize_mermaid_id(id: &str) -> String {
1721 let mut out = String::with_capacity(id.len() + 1);
1722 if id
1723 .chars()
1724 .next()
1725 .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_')
1726 {
1727 out.push_str(id);
1728 } else {
1729 out.push('n');
1730 out.push('_');
1731 out.push_str(id);
1732 }
1733 out.chars()
1734 .map(|ch| {
1735 if ch.is_ascii_alphanumeric() || ch == '_' {
1736 ch
1737 } else {
1738 '_'
1739 }
1740 })
1741 .collect()
1742}
1743
1744fn escape_mermaid_label(label: &str) -> String {
1745 label.replace('"', "\\\"")
1746}
1747
1748#[derive(Debug, Clone)]
1749pub struct YamlLlmExecutionRequest {
1750 pub node_id: String,
1751 pub is_terminal_node: bool,
1752 pub stream_json_as_text: bool,
1753 pub model: String,
1754 pub messages: Option<Vec<Message>>,
1755 pub append_prompt_as_user: bool,
1756 pub prompt: String,
1757 pub prompt_template: String,
1758 pub prompt_bindings: Vec<YamlTemplateBinding>,
1759 pub schema: Value,
1760 pub stream: bool,
1761 pub heal: bool,
1762 pub tools: Vec<YamlResolvedTool>,
1763 pub tool_choice: Option<ToolChoice>,
1764 pub max_tool_roundtrips: u8,
1765 pub tool_calls_global_key: Option<String>,
1766 pub tool_trace_mode: YamlToolTraceMode,
1767 pub execution_context: Value,
1768 pub email_text: String,
1769 pub trace_id: Option<String>,
1770 pub trace_context: Option<TraceContext>,
1771 pub trace_sampled: bool,
1772}
1773
1774#[derive(Debug, Clone)]
1775pub struct YamlResolvedTool {
1776 pub definition: ToolDefinition,
1777 pub output_schema: Option<Value>,
1778}
1779
1780#[async_trait]
1781pub trait YamlWorkflowLlmExecutor: Send + Sync {
1782 async fn complete_structured(
1783 &self,
1784 request: YamlLlmExecutionRequest,
1785 event_sink: Option<&dyn YamlWorkflowEventSink>,
1786 ) -> Result<YamlLlmExecutionResult, String>;
1787}
1788
1789#[async_trait]
1790pub trait YamlWorkflowCustomWorkerExecutor: Send + Sync {
1791 async fn execute(
1792 &self,
1793 handler: &str,
1794 payload: &Value,
1795 email_text: &str,
1796 context: &Value,
1797 ) -> Result<Value, String>;
1798}
1799
1800pub async fn run_workflow_yaml_file(
1801 workflow_path: &Path,
1802 workflow_input: &Value,
1803 executor: &dyn YamlWorkflowLlmExecutor,
1804) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1805 let contents =
1806 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1807 path: workflow_path.display().to_string(),
1808 source,
1809 })?;
1810
1811 let workflow: YamlWorkflow =
1812 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1813 path: workflow_path.display().to_string(),
1814 source,
1815 })?;
1816
1817 run_workflow_yaml(&workflow, workflow_input, executor).await
1818}
1819
1820pub async fn run_email_workflow_yaml_file(
1821 workflow_path: &Path,
1822 email_text: &str,
1823 executor: &dyn YamlWorkflowLlmExecutor,
1824) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1825 let workflow_input = json!({ "email_text": email_text });
1826 run_workflow_yaml_file(workflow_path, &workflow_input, executor).await
1827}
1828
1829pub async fn run_workflow_yaml_file_with_client(
1830 workflow_path: &Path,
1831 workflow_input: &Value,
1832 client: &SimpleAgentsClient,
1833) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1834 let contents =
1835 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1836 path: workflow_path.display().to_string(),
1837 source,
1838 })?;
1839
1840 let workflow: YamlWorkflow =
1841 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1842 path: workflow_path.display().to_string(),
1843 source,
1844 })?;
1845
1846 run_workflow_yaml_with_client(&workflow, workflow_input, client).await
1847}
1848
1849pub async fn run_email_workflow_yaml_file_with_client(
1850 workflow_path: &Path,
1851 email_text: &str,
1852 client: &SimpleAgentsClient,
1853) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1854 let workflow_input = json!({ "email_text": email_text });
1855 run_workflow_yaml_file_with_client(workflow_path, &workflow_input, client).await
1856}
1857
1858pub async fn run_workflow_yaml_with_client(
1859 workflow: &YamlWorkflow,
1860 workflow_input: &Value,
1861 client: &SimpleAgentsClient,
1862) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1863 run_workflow_yaml_with_client_and_custom_worker(workflow, workflow_input, client, None).await
1864}
1865
1866pub async fn run_email_workflow_yaml_with_client(
1867 workflow: &YamlWorkflow,
1868 email_text: &str,
1869 client: &SimpleAgentsClient,
1870) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1871 let workflow_input = json!({ "email_text": email_text });
1872 run_workflow_yaml_with_client(workflow, &workflow_input, client).await
1873}
1874
1875pub async fn run_workflow_yaml_file_with_client_and_custom_worker(
1876 workflow_path: &Path,
1877 workflow_input: &Value,
1878 client: &SimpleAgentsClient,
1879 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1880) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1881 let contents =
1882 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1883 path: workflow_path.display().to_string(),
1884 source,
1885 })?;
1886
1887 let workflow: YamlWorkflow =
1888 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1889 path: workflow_path.display().to_string(),
1890 source,
1891 })?;
1892
1893 run_workflow_yaml_with_client_and_custom_worker(
1894 &workflow,
1895 workflow_input,
1896 client,
1897 custom_worker,
1898 )
1899 .await
1900}
1901
1902pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker(
1903 workflow_path: &Path,
1904 email_text: &str,
1905 client: &SimpleAgentsClient,
1906 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1907) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1908 let workflow_input = json!({ "email_text": email_text });
1909 run_workflow_yaml_file_with_client_and_custom_worker(
1910 workflow_path,
1911 &workflow_input,
1912 client,
1913 custom_worker,
1914 )
1915 .await
1916}
1917
1918pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events(
1919 workflow_path: &Path,
1920 workflow_input: &Value,
1921 client: &SimpleAgentsClient,
1922 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1923 event_sink: Option<&dyn YamlWorkflowEventSink>,
1924) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1925 run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
1926 workflow_path,
1927 workflow_input,
1928 client,
1929 custom_worker,
1930 event_sink,
1931 &YamlWorkflowRunOptions::default(),
1932 )
1933 .await
1934}
1935
1936pub async fn run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
1937 workflow_path: &Path,
1938 workflow_input: &Value,
1939 client: &SimpleAgentsClient,
1940 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1941 event_sink: Option<&dyn YamlWorkflowEventSink>,
1942 options: &YamlWorkflowRunOptions,
1943) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1944 let contents =
1945 std::fs::read_to_string(workflow_path).map_err(|source| YamlWorkflowRunError::Read {
1946 path: workflow_path.display().to_string(),
1947 source,
1948 })?;
1949
1950 let workflow: YamlWorkflow =
1951 serde_yaml::from_str(&contents).map_err(|source| YamlWorkflowRunError::Parse {
1952 path: workflow_path.display().to_string(),
1953 source,
1954 })?;
1955
1956 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1957 &workflow,
1958 workflow_input,
1959 client,
1960 custom_worker,
1961 event_sink,
1962 options,
1963 )
1964 .await
1965}
1966
1967pub async fn run_email_workflow_yaml_file_with_client_and_custom_worker_and_events(
1968 workflow_path: &Path,
1969 email_text: &str,
1970 client: &SimpleAgentsClient,
1971 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1972 event_sink: Option<&dyn YamlWorkflowEventSink>,
1973) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1974 let workflow_input = json!({ "email_text": email_text });
1975 run_workflow_yaml_file_with_client_and_custom_worker_and_events(
1976 workflow_path,
1977 &workflow_input,
1978 client,
1979 custom_worker,
1980 event_sink,
1981 )
1982 .await
1983}
1984
1985pub async fn run_workflow_yaml_with_client_and_custom_worker(
1986 workflow: &YamlWorkflow,
1987 workflow_input: &Value,
1988 client: &SimpleAgentsClient,
1989 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
1990) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
1991 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
1992 workflow,
1993 workflow_input,
1994 client,
1995 custom_worker,
1996 None,
1997 &YamlWorkflowRunOptions::default(),
1998 )
1999 .await
2000}
2001
2002pub async fn run_email_workflow_yaml_with_client_and_custom_worker(
2003 workflow: &YamlWorkflow,
2004 email_text: &str,
2005 client: &SimpleAgentsClient,
2006 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2007) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2008 let workflow_input = json!({ "email_text": email_text });
2009 run_workflow_yaml_with_client_and_custom_worker(
2010 workflow,
2011 &workflow_input,
2012 client,
2013 custom_worker,
2014 )
2015 .await
2016}
2017
2018pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events(
2019 workflow: &YamlWorkflow,
2020 workflow_input: &Value,
2021 client: &SimpleAgentsClient,
2022 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2023 event_sink: Option<&dyn YamlWorkflowEventSink>,
2024) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2025 run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2026 workflow,
2027 workflow_input,
2028 client,
2029 custom_worker,
2030 event_sink,
2031 &YamlWorkflowRunOptions::default(),
2032 )
2033 .await
2034}
2035
2036pub async fn run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
2037 workflow: &YamlWorkflow,
2038 workflow_input: &Value,
2039 client: &SimpleAgentsClient,
2040 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
2041 event_sink: Option<&dyn YamlWorkflowEventSink>,
2042 options: &YamlWorkflowRunOptions,
2043) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
2044 struct BorrowedClientExecutor<'a> {
2045 client: &'a SimpleAgentsClient,
2046 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
2047 run_options: YamlWorkflowRunOptions,
2048 }
2049
2050 #[async_trait]
2051 impl<'a> YamlWorkflowLlmExecutor for BorrowedClientExecutor<'a> {
2052 async fn complete_structured(
2053 &self,
2054 request: YamlLlmExecutionRequest,
2055 event_sink: Option<&dyn YamlWorkflowEventSink>,
2056 ) -> Result<YamlLlmExecutionResult, String> {
2057 let expects_object = schema_expects_object(&request.schema);
2058 let messages = if let Some(mut history) = request.messages.clone() {
2059 if request.append_prompt_as_user && !request.prompt.trim().is_empty() {
2060 history.push(Message::user(&request.prompt));
2061 }
2062 history
2063 } else {
2064 vec![
2065 Message::system("You execute workflow classification steps."),
2066 Message::user(&request.prompt),
2067 ]
2068 };
2069
2070 if !request.tools.is_empty() {
2071 let mut tool_traces: Vec<YamlToolCallTrace> = Vec::new();
2072 let mut conversation = messages;
2073 let mut usage_total: Option<YamlLlmTokenUsage> = None;
2074
2075 for roundtrip in 0..=request.max_tool_roundtrips {
2076 let mut builder = CompletionRequest::builder()
2077 .model(&request.model)
2078 .messages(conversation.clone())
2079 .tools(request.tools.iter().map(|t| t.definition.clone()).collect());
2080
2081 if request.heal && expects_object {
2082 builder = builder.json_schema("workflow_step", request.schema.clone());
2083 }
2084
2085 if let Some(choice) = request.tool_choice.clone() {
2086 builder = builder.tool_choice(choice);
2087 }
2088
2089 if request.stream {
2090 builder = builder.stream(true);
2091 }
2092
2093 let completion_request = builder
2094 .build()
2095 .map_err(|error| format!("failed to build completion request: {error}"))?;
2096
2097 let outcome = self
2098 .client
2099 .complete(&completion_request, CompletionOptions::default())
2100 .await
2101 .map_err(|error| error.to_string())?;
2102
2103 let mut streamed_tool_calls: Option<Vec<ToolCall>> = None;
2104 let mut streamed_content = String::new();
2105 let mut finish_reason = FinishReason::Stop;
2106
2107 match outcome {
2108 CompletionOutcome::Response(response) => {
2109 if let Some(usage) = usage_total.as_mut() {
2110 usage.prompt_tokens += response.usage.prompt_tokens;
2111 usage.completion_tokens += response.usage.completion_tokens;
2112 usage.total_tokens += response.usage.total_tokens;
2113 if let Some(reasoning_tokens) = response.usage.reasoning_tokens {
2114 usage.reasoning_tokens = Some(
2115 usage.reasoning_tokens.unwrap_or(0) + reasoning_tokens,
2116 );
2117 }
2118 } else {
2119 usage_total = Some(YamlLlmTokenUsage {
2120 prompt_tokens: response.usage.prompt_tokens,
2121 completion_tokens: response.usage.completion_tokens,
2122 total_tokens: response.usage.total_tokens,
2123 reasoning_tokens: response.usage.reasoning_tokens,
2124 });
2125 }
2126
2127 let choice = response
2128 .choices
2129 .first()
2130 .ok_or_else(|| "completion returned no choices".to_string())?;
2131 streamed_content = choice.message.content.clone();
2132 streamed_tool_calls = choice.message.tool_calls.clone();
2133 finish_reason = choice.finish_reason;
2134 }
2135 CompletionOutcome::HealedJson(healed) => {
2136 let response = healed.response;
2137 if let Some(usage) = usage_total.as_mut() {
2138 usage.prompt_tokens += response.usage.prompt_tokens;
2139 usage.completion_tokens += response.usage.completion_tokens;
2140 usage.total_tokens += response.usage.total_tokens;
2141 if let Some(reasoning_tokens) = response.usage.reasoning_tokens {
2142 usage.reasoning_tokens = Some(
2143 usage.reasoning_tokens.unwrap_or(0) + reasoning_tokens,
2144 );
2145 }
2146 } else {
2147 usage_total = Some(YamlLlmTokenUsage {
2148 prompt_tokens: response.usage.prompt_tokens,
2149 completion_tokens: response.usage.completion_tokens,
2150 total_tokens: response.usage.total_tokens,
2151 reasoning_tokens: response.usage.reasoning_tokens,
2152 });
2153 }
2154
2155 let choice = response
2156 .choices
2157 .first()
2158 .ok_or_else(|| "completion returned no choices".to_string())?;
2159 streamed_content = choice.message.content.clone();
2160 streamed_tool_calls = choice.message.tool_calls.clone();
2161 finish_reason = choice.finish_reason;
2162 }
2163 CompletionOutcome::CoercedSchema(coerced) => {
2164 let response = coerced.response;
2165 if let Some(usage) = usage_total.as_mut() {
2166 usage.prompt_tokens += response.usage.prompt_tokens;
2167 usage.completion_tokens += response.usage.completion_tokens;
2168 usage.total_tokens += response.usage.total_tokens;
2169 if let Some(reasoning_tokens) = response.usage.reasoning_tokens {
2170 usage.reasoning_tokens = Some(
2171 usage.reasoning_tokens.unwrap_or(0) + reasoning_tokens,
2172 );
2173 }
2174 } else {
2175 usage_total = Some(YamlLlmTokenUsage {
2176 prompt_tokens: response.usage.prompt_tokens,
2177 completion_tokens: response.usage.completion_tokens,
2178 total_tokens: response.usage.total_tokens,
2179 reasoning_tokens: response.usage.reasoning_tokens,
2180 });
2181 }
2182
2183 let choice = response
2184 .choices
2185 .first()
2186 .ok_or_else(|| "completion returned no choices".to_string())?;
2187 streamed_content = choice.message.content.clone();
2188 streamed_tool_calls = choice.message.tool_calls.clone();
2189 finish_reason = choice.finish_reason;
2190 }
2191 CompletionOutcome::Stream(mut stream) => {
2192 let mut final_stream_usage: Option<simple_agent_type::response::Usage> =
2193 None;
2194 let mut delta_filter = StructuredJsonDeltaFilter::default();
2195 let include_raw_debug = include_raw_stream_debug_events();
2196 let mut json_text_formatter = if request.stream_json_as_text {
2197 Some(StreamJsonAsTextFormatter::default())
2198 } else {
2199 None
2200 };
2201 let mut tool_calls_by_index: HashMap<u32, ToolCall> = HashMap::new();
2202
2203 while let Some(chunk_result) = stream.next().await {
2204 if event_sink_is_cancelled(event_sink) {
2205 return Err(workflow_event_sink_cancelled_message().to_string());
2206 }
2207
2208 let chunk = chunk_result.map_err(|error| error.to_string())?;
2209 if let Some(usage) = chunk.usage {
2210 final_stream_usage = Some(usage);
2211 }
2212
2213 if let Some(choice) = chunk.choices.first() {
2214 if let Some(chunk_finish_reason) = choice.finish_reason {
2215 finish_reason = chunk_finish_reason;
2216 }
2217
2218 if include_raw_debug {
2219 if let Some(reasoning_delta) =
2220 choice.delta.reasoning_content.as_ref()
2221 {
2222 if let Some(sink) = event_sink {
2223 sink.emit(&YamlWorkflowEvent {
2224 event_type: "node_stream_thinking_delta"
2225 .to_string(),
2226 node_id: Some(request.node_id.clone()),
2227 step_id: Some(request.node_id.clone()),
2228 node_kind: Some("llm_call".to_string()),
2229 streamable: Some(true),
2230 message: None,
2231 delta: Some(reasoning_delta.clone()),
2232 token_kind: Some(
2233 YamlWorkflowTokenKind::Thinking,
2234 ),
2235 is_terminal_node_token: Some(
2236 request.is_terminal_node,
2237 ),
2238 elapsed_ms: None,
2239 metadata: None,
2240 });
2241 }
2242 }
2243 }
2244
2245 if let Some(delta) = choice.delta.content.clone() {
2246 streamed_content.push_str(delta.as_str());
2247 let (output_delta, thinking_delta) = if expects_object {
2248 delta_filter.split(delta.as_str())
2249 } else {
2250 (Some(delta.clone()), None)
2251 };
2252 let rendered_output_delta = if let Some(output_chunk) =
2253 output_delta
2254 {
2255 if let Some(formatter) = json_text_formatter.as_mut() {
2256 formatter.push(output_chunk.as_str());
2257 formatter.emit_if_ready(delta_filter.completed())
2258 } else {
2259 Some(output_chunk)
2260 }
2261 } else {
2262 None
2263 };
2264
2265 if include_raw_debug {
2266 if let Some(sink) = event_sink {
2267 if let Some(raw_thinking_delta) =
2268 thinking_delta.as_ref()
2269 {
2270 sink.emit(&YamlWorkflowEvent {
2271 event_type: "node_stream_thinking_delta"
2272 .to_string(),
2273 node_id: Some(request.node_id.clone()),
2274 step_id: Some(request.node_id.clone()),
2275 node_kind: Some("llm_call".to_string()),
2276 streamable: Some(true),
2277 message: None,
2278 delta: Some(raw_thinking_delta.clone()),
2279 token_kind: Some(
2280 YamlWorkflowTokenKind::Thinking,
2281 ),
2282 is_terminal_node_token: Some(
2283 request.is_terminal_node,
2284 ),
2285 elapsed_ms: None,
2286 metadata: None,
2287 });
2288 }
2289 if let Some(raw_output_delta) =
2290 rendered_output_delta.as_ref()
2291 {
2292 sink.emit(&YamlWorkflowEvent {
2293 event_type: "node_stream_output_delta"
2294 .to_string(),
2295 node_id: Some(request.node_id.clone()),
2296 step_id: Some(request.node_id.clone()),
2297 node_kind: Some("llm_call".to_string()),
2298 streamable: Some(true),
2299 message: None,
2300 delta: Some(raw_output_delta.clone()),
2301 token_kind: Some(
2302 YamlWorkflowTokenKind::Output,
2303 ),
2304 is_terminal_node_token: Some(
2305 request.is_terminal_node,
2306 ),
2307 elapsed_ms: None,
2308 metadata: None,
2309 });
2310 }
2311 }
2312 }
2313
2314 if let Some(filtered_delta) = rendered_output_delta {
2315 if let Some(sink) = event_sink {
2316 sink.emit(&YamlWorkflowEvent {
2317 event_type: "node_stream_delta".to_string(),
2318 node_id: Some(request.node_id.clone()),
2319 step_id: Some(request.node_id.clone()),
2320 node_kind: Some("llm_call".to_string()),
2321 streamable: Some(true),
2322 message: None,
2323 delta: Some(filtered_delta),
2324 token_kind: Some(YamlWorkflowTokenKind::Output),
2325 is_terminal_node_token: Some(
2326 request.is_terminal_node,
2327 ),
2328 elapsed_ms: None,
2329 metadata: None,
2330 });
2331 }
2332 }
2333 }
2334
2335 if let Some(tool_call_deltas) = choice.delta.tool_calls.as_ref()
2336 {
2337 for tool_call_delta in tool_call_deltas {
2338 let entry = tool_calls_by_index
2339 .entry(tool_call_delta.index)
2340 .or_insert_with(|| ToolCall {
2341 id: tool_call_delta.id.clone().unwrap_or_else(
2342 || {
2343 format!(
2344 "tool_call_{}",
2345 tool_call_delta.index
2346 )
2347 },
2348 ),
2349 tool_type: ToolType::Function,
2350 function:
2351 simple_agent_type::tool::ToolCallFunction {
2352 name: String::new(),
2353 arguments: String::new(),
2354 },
2355 });
2356
2357 if let Some(id) = tool_call_delta.id.as_ref() {
2358 entry.id = id.clone();
2359 }
2360 if let Some(tool_type) = tool_call_delta.tool_type {
2361 entry.tool_type = tool_type;
2362 }
2363 if let Some(function_delta) =
2364 tool_call_delta.function.as_ref()
2365 {
2366 if let Some(name) = function_delta.name.as_ref() {
2367 entry.function.name = name.clone();
2368 }
2369 if let Some(arguments) =
2370 function_delta.arguments.as_ref()
2371 {
2372 entry.function.arguments.push_str(arguments);
2373 }
2374 }
2375 }
2376 }
2377 }
2378
2379 if event_sink_is_cancelled(event_sink) {
2380 return Err(workflow_event_sink_cancelled_message().to_string());
2381 }
2382 }
2383
2384 if let Some(usage) = final_stream_usage {
2385 if let Some(total) = usage_total.as_mut() {
2386 total.prompt_tokens += usage.prompt_tokens;
2387 total.completion_tokens += usage.completion_tokens;
2388 total.total_tokens += usage.total_tokens;
2389 if let Some(reasoning_tokens) = usage.reasoning_tokens {
2390 total.reasoning_tokens = Some(
2391 total.reasoning_tokens.unwrap_or(0) + reasoning_tokens,
2392 );
2393 }
2394 } else {
2395 usage_total = Some(YamlLlmTokenUsage {
2396 prompt_tokens: usage.prompt_tokens,
2397 completion_tokens: usage.completion_tokens,
2398 total_tokens: usage.total_tokens,
2399 reasoning_tokens: usage.reasoning_tokens,
2400 });
2401 }
2402 }
2403
2404 let mut ordered_tool_calls =
2405 tool_calls_by_index.into_iter().collect::<Vec<_>>();
2406 ordered_tool_calls.sort_by_key(|(index, _)| *index);
2407 if !ordered_tool_calls.is_empty() {
2408 streamed_tool_calls = Some(
2409 ordered_tool_calls
2410 .into_iter()
2411 .map(|(_, tool_call)| tool_call)
2412 .collect::<Vec<_>>(),
2413 );
2414 }
2415 }
2416 }
2417
2418 let has_tool_calls = streamed_tool_calls
2419 .as_ref()
2420 .is_some_and(|calls| !calls.is_empty());
2421 if finish_reason != FinishReason::ToolCalls && !has_tool_calls {
2422 let payload = if expects_object {
2423 parse_streamed_structured_payload(
2424 streamed_content.as_str(),
2425 request.heal,
2426 )
2427 .map_err(|error| {
2428 format!("failed to parse structured completion JSON: {error}")
2429 })?
2430 .payload
2431 } else {
2432 Value::String(streamed_content.clone())
2433 };
2434 return Ok(YamlLlmExecutionResult {
2435 payload,
2436 usage: usage_total,
2437 ttft_ms: None,
2438 tool_calls: tool_traces,
2439 });
2440 }
2441
2442 if roundtrip >= request.max_tool_roundtrips {
2443 return Err(format!(
2444 "tool call roundtrip limit reached for node '{}' (max={})",
2445 request.node_id, request.max_tool_roundtrips
2446 ));
2447 }
2448
2449 let tool_calls: Vec<ToolCall> = streamed_tool_calls.ok_or_else(|| {
2450 "finish_reason=tool_calls but no tool calls found".to_string()
2451 })?;
2452 if tool_calls
2453 .iter()
2454 .any(|tool_call| tool_call.function.name.trim().is_empty())
2455 {
2456 return Err("streamed tool call missing function name".to_string());
2457 }
2458
2459 let assistant_tool_message =
2460 Message::assistant(&streamed_content).with_tool_calls(tool_calls.clone());
2461 conversation.push(assistant_tool_message);
2462
2463 for tool_call in tool_calls {
2464 let tool_call_id = tool_call.id.clone();
2465 let tool_name = tool_call.function.name.clone();
2466 let tool_started = Instant::now();
2467 let arguments: Value = serde_json::from_str(&tool_call.function.arguments)
2468 .map_err(|error| {
2469 format!(
2470 "tool '{}' arguments must be valid JSON: {}",
2471 tool_name, error
2472 )
2473 })?;
2474 let mut tool_span_context: Option<TraceContext> = None;
2475 let mut tool_span = if request.trace_sampled {
2476 let (span_context, mut span) = workflow_tracer().start_span(
2477 "workflow.tool.execute",
2478 SpanKind::Node,
2479 request.trace_context.as_ref(),
2480 );
2481 tool_span_context = Some(span_context);
2482 span.set_attribute(
2483 "trace_id",
2484 request.trace_id.as_deref().unwrap_or_default(),
2485 );
2486 span.set_attribute("node_id", request.node_id.as_str());
2487 span.set_attribute("node_kind", "llm_call");
2488 span.set_attribute("tool_name", tool_name.as_str());
2489 span.set_attribute("tool_call_id", tool_call_id.as_str());
2490 let args_for_span =
2491 payload_for_tool_trace(request.tool_trace_mode, &arguments)
2492 .to_string();
2493 span.set_attribute("tool_arguments", args_for_span.as_str());
2494 Some(span)
2495 } else {
2496 None
2497 };
2498
2499 if request.tool_trace_mode != YamlToolTraceMode::Off {
2500 if let Some(sink) = event_sink {
2501 sink.emit(&YamlWorkflowEvent {
2502 event_type: "node_tool_call_requested".to_string(),
2503 node_id: Some(request.node_id.clone()),
2504 step_id: Some(request.node_id.clone()),
2505 node_kind: Some("llm_call".to_string()),
2506 streamable: Some(false),
2507 message: Some(format!(
2508 "tool call requested: {}",
2509 tool_name
2510 )),
2511 delta: None,
2512 token_kind: None,
2513 is_terminal_node_token: None,
2514 elapsed_ms: None,
2515 metadata: Some(json!({
2516 "tool_call_id": tool_call_id.clone(),
2517 "tool_name": tool_name.clone(),
2518 "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
2519 })),
2520 });
2521 }
2522 }
2523
2524 let Some(tool_config) = request
2525 .tools
2526 .iter()
2527 .find(|tool| tool.definition.function.name == tool_name)
2528 else {
2529 return Err(format!("model requested unknown tool '{}'", tool_name));
2530 };
2531
2532 let tool_output_result = if tool_name == "run_workflow_graph" {
2533 execute_subworkflow_tool_call(
2534 &arguments,
2535 &request.execution_context,
2536 self.client,
2537 self.custom_worker,
2538 &self.run_options,
2539 tool_span_context.as_ref(),
2540 request.trace_id.as_deref(),
2541 )
2542 .await
2543 } else if let Some(custom_worker) = self.custom_worker {
2544 custom_worker
2545 .execute(
2546 tool_name.as_str(),
2547 &arguments,
2548 request.email_text.as_str(),
2549 &request.execution_context,
2550 )
2551 .await
2552 } else {
2553 Err(format!(
2554 "tool '{}' requested but no custom worker executor is configured",
2555 tool_name
2556 ))
2557 };
2558
2559 let tool_output = match tool_output_result {
2560 Ok(output) => output,
2561 Err(message) => {
2562 let elapsed_ms = tool_started.elapsed().as_millis();
2563 if let Some(span) = tool_span.as_mut() {
2564 span.add_event("workflow.tool.execute.error");
2565 span.set_attribute("tool_status", "error");
2566 span.set_attribute("tool_error", message.as_str());
2567 span.set_attribute(
2568 "elapsed_ms",
2569 elapsed_ms.to_string().as_str(),
2570 );
2571 }
2572 if request.tool_trace_mode != YamlToolTraceMode::Off {
2573 if let Some(sink) = event_sink {
2574 sink.emit(&YamlWorkflowEvent {
2575 event_type: "node_tool_call_failed".to_string(),
2576 node_id: Some(request.node_id.clone()),
2577 step_id: Some(request.node_id.clone()),
2578 node_kind: Some("llm_call".to_string()),
2579 streamable: Some(false),
2580 message: Some(message.clone()),
2581 delta: None,
2582 token_kind: None,
2583 is_terminal_node_token: None,
2584 elapsed_ms: Some(elapsed_ms),
2585 metadata: Some(json!({
2586 "tool_call_id": tool_call_id.clone(),
2587 "tool_name": tool_name.clone(),
2588 })),
2589 });
2590 }
2591 }
2592 tool_traces.push(YamlToolCallTrace {
2593 id: tool_call_id.clone(),
2594 name: tool_name.clone(),
2595 arguments,
2596 output: None,
2597 status: "error".to_string(),
2598 elapsed_ms,
2599 error: Some(message.clone()),
2600 });
2601 if let Some(span) = tool_span.take() {
2602 span.end();
2603 }
2604 return Err(format!("tool '{}' failed: {}", tool_name, message));
2605 }
2606 };
2607
2608 if let Some(output_schema) = tool_config.output_schema.as_ref() {
2609 validate_schema_instance(output_schema, &tool_output).map_err(
2610 |message| {
2611 format!(
2612 "tool '{}' output failed schema validation: {}",
2613 tool_name, message
2614 )
2615 },
2616 )?;
2617 }
2618
2619 let elapsed_ms = tool_started.elapsed().as_millis();
2620 if let Some(span) = tool_span.as_mut() {
2621 span.add_event("workflow.tool.execute.completed");
2622 span.set_attribute("tool_status", "ok");
2623 span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
2624 let output_for_span =
2625 payload_for_tool_trace(request.tool_trace_mode, &tool_output)
2626 .to_string();
2627 span.set_attribute("tool_output", output_for_span.as_str());
2628 }
2629 if request.tool_trace_mode != YamlToolTraceMode::Off {
2630 if let Some(sink) = event_sink {
2631 sink.emit(&YamlWorkflowEvent {
2632 event_type: "node_tool_call_completed".to_string(),
2633 node_id: Some(request.node_id.clone()),
2634 step_id: Some(request.node_id.clone()),
2635 node_kind: Some("llm_call".to_string()),
2636 streamable: Some(false),
2637 message: Some(format!(
2638 "tool call completed: {}",
2639 tool_name
2640 )),
2641 delta: None,
2642 token_kind: None,
2643 is_terminal_node_token: None,
2644 elapsed_ms: Some(elapsed_ms),
2645 metadata: Some(json!({
2646 "tool_call_id": tool_call_id.clone(),
2647 "tool_name": tool_name.clone(),
2648 "arguments": payload_for_tool_trace(request.tool_trace_mode, &arguments),
2649 "output": payload_for_tool_trace(request.tool_trace_mode, &tool_output),
2650 })),
2651 });
2652 }
2653 }
2654
2655 tool_traces.push(YamlToolCallTrace {
2656 id: tool_call_id.clone(),
2657 name: tool_name.clone(),
2658 arguments: arguments.clone(),
2659 output: Some(tool_output.clone()),
2660 status: "ok".to_string(),
2661 elapsed_ms,
2662 error: None,
2663 });
2664
2665 conversation.push(Message::tool(
2666 serde_json::to_string(&tool_output).map_err(|error| {
2667 format!("failed to serialize tool output: {error}")
2668 })?,
2669 tool_call_id,
2670 ));
2671 if let Some(span) = tool_span.take() {
2672 span.end();
2673 }
2674 }
2675
2676 if request.tool_trace_mode != YamlToolTraceMode::Off {
2677 if let Some(sink) = event_sink {
2678 sink.emit(&YamlWorkflowEvent {
2679 event_type: "node_tool_roundtrip_completed".to_string(),
2680 node_id: Some(request.node_id.clone()),
2681 step_id: Some(request.node_id.clone()),
2682 node_kind: Some("llm_call".to_string()),
2683 streamable: Some(false),
2684 message: Some(format!(
2685 "tool roundtrip {} completed",
2686 roundtrip + 1
2687 )),
2688 delta: None,
2689 token_kind: None,
2690 is_terminal_node_token: None,
2691 elapsed_ms: None,
2692 metadata: Some(json!({
2693 "roundtrip": roundtrip + 1,
2694 "max_tool_roundtrips": request.max_tool_roundtrips,
2695 })),
2696 });
2697 }
2698 }
2699 }
2700
2701 return Err(format!(
2702 "tool-enabled llm_call '{}' exhausted loop without final payload",
2703 request.node_id
2704 ));
2705 }
2706
2707 let mut builder = CompletionRequest::builder()
2708 .model(&request.model)
2709 .messages(messages);
2710
2711 if request.heal && !request.stream && expects_object {
2712 builder = builder.json_schema("workflow_step", request.schema.clone());
2713 }
2714
2715 if request.stream {
2716 builder = builder.stream(true);
2717 }
2718
2719 let completion_request = builder
2720 .build()
2721 .map_err(|error| format!("failed to build completion request: {error}"))?;
2722
2723 let completion_options = if request.heal && !request.stream && expects_object {
2724 CompletionOptions {
2725 mode: CompletionMode::HealedJson,
2726 }
2727 } else {
2728 CompletionOptions::default()
2729 };
2730
2731 let outcome = self
2732 .client
2733 .complete(&completion_request, completion_options)
2734 .await
2735 .map_err(|error| error.to_string())?;
2736
2737 match outcome {
2738 CompletionOutcome::Stream(mut stream) => {
2739 let mut aggregated = String::new();
2740 let mut final_stream_usage: Option<simple_agent_type::response::Usage> = None;
2741 let stream_started = Instant::now();
2742 let mut ttft_ms: Option<u128> = None;
2743 let mut delta_filter = StructuredJsonDeltaFilter::default();
2744 let include_raw_debug = include_raw_stream_debug_events();
2745 let mut json_text_formatter = if request.stream_json_as_text {
2746 Some(StreamJsonAsTextFormatter::default())
2747 } else {
2748 None
2749 };
2750 while let Some(chunk_result) = stream.next().await {
2751 if event_sink_is_cancelled(event_sink) {
2752 return Err(workflow_event_sink_cancelled_message().to_string());
2753 }
2754 let chunk = chunk_result.map_err(|error| error.to_string())?;
2755 if let Some(usage) = chunk.usage {
2756 final_stream_usage = Some(usage);
2757 }
2758 if let Some(choice) = chunk.choices.first() {
2759 if ttft_ms.is_none()
2760 && (choice
2761 .delta
2762 .content
2763 .as_ref()
2764 .is_some_and(|delta| !delta.is_empty())
2765 || choice
2766 .delta
2767 .reasoning_content
2768 .as_ref()
2769 .is_some_and(|delta| !delta.is_empty()))
2770 {
2771 ttft_ms = Some(stream_started.elapsed().as_millis());
2772 }
2773 if include_raw_debug {
2774 if let Some(reasoning_delta) =
2775 choice.delta.reasoning_content.as_ref()
2776 {
2777 if let Some(sink) = event_sink {
2778 sink.emit(&YamlWorkflowEvent {
2779 event_type: "node_stream_thinking_delta".to_string(),
2780 node_id: Some(request.node_id.clone()),
2781 step_id: Some(request.node_id.clone()),
2782 node_kind: Some("llm_call".to_string()),
2783 streamable: Some(true),
2784 message: None,
2785 delta: Some(reasoning_delta.clone()),
2786 token_kind: Some(YamlWorkflowTokenKind::Thinking),
2787 is_terminal_node_token: Some(request.is_terminal_node),
2788 elapsed_ms: None,
2789 metadata: None,
2790 });
2791 }
2792 }
2793 }
2794 if let Some(delta) = choice.delta.content.clone() {
2795 aggregated.push_str(delta.as_str());
2796 let (output_delta, thinking_delta) = if expects_object {
2797 delta_filter.split(delta.as_str())
2798 } else {
2799 (Some(delta.clone()), None)
2800 };
2801 let rendered_output_delta = if let Some(output_chunk) = output_delta
2802 {
2803 if let Some(formatter) = json_text_formatter.as_mut() {
2804 formatter.push(output_chunk.as_str());
2805 formatter.emit_if_ready(delta_filter.completed())
2806 } else {
2807 Some(output_chunk)
2808 }
2809 } else {
2810 None
2811 };
2812 if include_raw_debug {
2813 if let Some(sink) = event_sink {
2814 if let Some(raw_thinking_delta) = thinking_delta.as_ref() {
2815 sink.emit(&YamlWorkflowEvent {
2816 event_type: "node_stream_thinking_delta"
2817 .to_string(),
2818 node_id: Some(request.node_id.clone()),
2819 step_id: Some(request.node_id.clone()),
2820 node_kind: Some("llm_call".to_string()),
2821 streamable: Some(true),
2822 message: None,
2823 delta: Some(raw_thinking_delta.clone()),
2824 token_kind: Some(YamlWorkflowTokenKind::Thinking),
2825 is_terminal_node_token: Some(
2826 request.is_terminal_node,
2827 ),
2828 elapsed_ms: None,
2829 metadata: None,
2830 });
2831 }
2832 if let Some(raw_output_delta) =
2833 rendered_output_delta.as_ref()
2834 {
2835 sink.emit(&YamlWorkflowEvent {
2836 event_type: "node_stream_output_delta".to_string(),
2837 node_id: Some(request.node_id.clone()),
2838 step_id: Some(request.node_id.clone()),
2839 node_kind: Some("llm_call".to_string()),
2840 streamable: Some(true),
2841 message: None,
2842 delta: Some(raw_output_delta.clone()),
2843 token_kind: Some(YamlWorkflowTokenKind::Output),
2844 is_terminal_node_token: Some(
2845 request.is_terminal_node,
2846 ),
2847 elapsed_ms: None,
2848 metadata: None,
2849 });
2850 }
2851 }
2852 }
2853 if let Some(filtered_delta) = rendered_output_delta {
2854 if let Some(sink) = event_sink {
2855 sink.emit(&YamlWorkflowEvent {
2856 event_type: "node_stream_delta".to_string(),
2857 node_id: Some(request.node_id.clone()),
2858 step_id: Some(request.node_id.clone()),
2859 node_kind: Some("llm_call".to_string()),
2860 streamable: Some(true),
2861 message: None,
2862 delta: Some(filtered_delta),
2863 token_kind: Some(YamlWorkflowTokenKind::Output),
2864 is_terminal_node_token: Some(request.is_terminal_node),
2865 elapsed_ms: None,
2866 metadata: None,
2867 });
2868 }
2869 }
2870 }
2871 }
2872
2873 if event_sink_is_cancelled(event_sink) {
2874 return Err(workflow_event_sink_cancelled_message().to_string());
2875 }
2876 }
2877
2878 let payload = if expects_object {
2879 let resolved =
2880 parse_streamed_structured_payload(aggregated.as_str(), request.heal)?;
2881 if let Some(confidence) = resolved.heal_confidence {
2882 if let Some(sink) = event_sink {
2883 sink.emit(&YamlWorkflowEvent {
2884 event_type: "node_healed".to_string(),
2885 node_id: Some(request.node_id.clone()),
2886 step_id: Some(request.node_id.clone()),
2887 node_kind: Some("llm_call".to_string()),
2888 streamable: Some(true),
2889 message: Some(format!(
2890 "healed streamed structured response confidence={confidence}"
2891 )),
2892 delta: None,
2893 token_kind: None,
2894 is_terminal_node_token: None,
2895 elapsed_ms: None,
2896 metadata: None,
2897 });
2898 }
2899 }
2900 resolved.payload
2901 } else {
2902 Value::String(aggregated)
2903 };
2904
2905 Ok(YamlLlmExecutionResult {
2906 payload,
2907 usage: final_stream_usage.map(|usage| YamlLlmTokenUsage {
2908 prompt_tokens: usage.prompt_tokens,
2909 completion_tokens: usage.completion_tokens,
2910 total_tokens: usage.total_tokens,
2911 reasoning_tokens: usage.reasoning_tokens,
2912 }),
2913 ttft_ms,
2914 tool_calls: Vec::new(),
2915 })
2916 }
2917 CompletionOutcome::Response(response) => {
2918 let payload = if expects_object {
2919 let content = response
2920 .content()
2921 .ok_or_else(|| "completion returned empty content".to_string())?;
2922 serde_json::from_str(content).map_err(|error| {
2923 format!("failed to parse structured completion JSON: {error}")
2924 })?
2925 } else {
2926 Value::String(response.content().unwrap_or_default().to_string())
2927 };
2928
2929 Ok(YamlLlmExecutionResult {
2930 payload,
2931 usage: Some(YamlLlmTokenUsage {
2932 prompt_tokens: response.usage.prompt_tokens,
2933 completion_tokens: response.usage.completion_tokens,
2934 total_tokens: response.usage.total_tokens,
2935 reasoning_tokens: response.usage.reasoning_tokens,
2936 }),
2937 ttft_ms: None,
2938 tool_calls: Vec::new(),
2939 })
2940 }
2941 CompletionOutcome::HealedJson(healed) => {
2942 if !expects_object {
2943 return Err(
2944 "healed json outcome is unsupported for non-object schema".to_string()
2945 );
2946 }
2947 if let Some(sink) = event_sink {
2948 sink.emit(&YamlWorkflowEvent {
2949 event_type: "node_healed".to_string(),
2950 node_id: Some(request.node_id.clone()),
2951 step_id: Some(request.node_id.clone()),
2952 node_kind: Some("llm_call".to_string()),
2953 streamable: Some(request.stream),
2954 message: Some(format!(
2955 "healed structured response confidence={}",
2956 healed.parsed.confidence
2957 )),
2958 delta: None,
2959 token_kind: None,
2960 is_terminal_node_token: None,
2961 elapsed_ms: None,
2962 metadata: None,
2963 });
2964 }
2965 Ok(YamlLlmExecutionResult {
2966 payload: healed.parsed.value,
2967 usage: Some(YamlLlmTokenUsage {
2968 prompt_tokens: healed.response.usage.prompt_tokens,
2969 completion_tokens: healed.response.usage.completion_tokens,
2970 total_tokens: healed.response.usage.total_tokens,
2971 reasoning_tokens: healed.response.usage.reasoning_tokens,
2972 }),
2973 ttft_ms: None,
2974 tool_calls: Vec::new(),
2975 })
2976 }
2977 CompletionOutcome::CoercedSchema(coerced) => {
2978 if !expects_object {
2979 return Err(
2980 "coerced schema outcome is unsupported for non-object schema"
2981 .to_string(),
2982 );
2983 }
2984 Ok(YamlLlmExecutionResult {
2985 payload: coerced.coerced.value,
2986 usage: Some(YamlLlmTokenUsage {
2987 prompt_tokens: coerced.response.usage.prompt_tokens,
2988 completion_tokens: coerced.response.usage.completion_tokens,
2989 total_tokens: coerced.response.usage.total_tokens,
2990 reasoning_tokens: coerced.response.usage.reasoning_tokens,
2991 }),
2992 ttft_ms: None,
2993 tool_calls: Vec::new(),
2994 })
2995 }
2996 }
2997 }
2998 }
2999
3000 let executor = BorrowedClientExecutor {
3001 client,
3002 custom_worker,
3003 run_options: options.clone(),
3004 };
3005 run_workflow_yaml_with_custom_worker_and_events_and_options(
3006 workflow,
3007 workflow_input,
3008 &executor,
3009 custom_worker,
3010 event_sink,
3011 options,
3012 )
3013 .await
3014}
3015
3016pub async fn run_email_workflow_yaml_with_client_and_custom_worker_and_events(
3017 workflow: &YamlWorkflow,
3018 email_text: &str,
3019 client: &SimpleAgentsClient,
3020 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3021 event_sink: Option<&dyn YamlWorkflowEventSink>,
3022) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3023 let workflow_input = json!({ "email_text": email_text });
3024 run_workflow_yaml_with_client_and_custom_worker_and_events(
3025 workflow,
3026 &workflow_input,
3027 client,
3028 custom_worker,
3029 event_sink,
3030 )
3031 .await
3032}
3033
3034pub async fn run_workflow_yaml(
3035 workflow: &YamlWorkflow,
3036 workflow_input: &Value,
3037 executor: &dyn YamlWorkflowLlmExecutor,
3038) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3039 run_workflow_yaml_with_custom_worker_and_events(workflow, workflow_input, executor, None, None)
3040 .await
3041}
3042
3043pub async fn run_email_workflow_yaml(
3044 workflow: &YamlWorkflow,
3045 email_text: &str,
3046 executor: &dyn YamlWorkflowLlmExecutor,
3047) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3048 let workflow_input = json!({ "email_text": email_text });
3049 run_workflow_yaml(workflow, &workflow_input, executor).await
3050}
3051
3052pub async fn run_workflow_yaml_with_custom_worker(
3053 workflow: &YamlWorkflow,
3054 workflow_input: &Value,
3055 executor: &dyn YamlWorkflowLlmExecutor,
3056 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3057) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3058 run_workflow_yaml_with_custom_worker_and_events(
3059 workflow,
3060 workflow_input,
3061 executor,
3062 custom_worker,
3063 None,
3064 )
3065 .await
3066}
3067
3068pub async fn run_email_workflow_yaml_with_custom_worker(
3069 workflow: &YamlWorkflow,
3070 email_text: &str,
3071 executor: &dyn YamlWorkflowLlmExecutor,
3072 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3073) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3074 let workflow_input = json!({ "email_text": email_text });
3075 run_workflow_yaml_with_custom_worker(workflow, &workflow_input, executor, custom_worker).await
3076}
3077
3078pub async fn run_workflow_yaml_with_custom_worker_and_events(
3079 workflow: &YamlWorkflow,
3080 workflow_input: &Value,
3081 executor: &dyn YamlWorkflowLlmExecutor,
3082 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3083 event_sink: Option<&dyn YamlWorkflowEventSink>,
3084) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3085 run_workflow_yaml_with_custom_worker_and_events_and_options(
3086 workflow,
3087 workflow_input,
3088 executor,
3089 custom_worker,
3090 event_sink,
3091 &YamlWorkflowRunOptions::default(),
3092 )
3093 .await
3094}
3095
3096pub async fn run_workflow_yaml_with_custom_worker_and_events_and_options(
3097 workflow: &YamlWorkflow,
3098 workflow_input: &Value,
3099 executor: &dyn YamlWorkflowLlmExecutor,
3100 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3101 event_sink: Option<&dyn YamlWorkflowEventSink>,
3102 options: &YamlWorkflowRunOptions,
3103) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
3104 if !workflow_input.is_object() {
3105 return Err(YamlWorkflowRunError::InvalidInput {
3106 message: "workflow input must be a JSON object".to_string(),
3107 });
3108 }
3109
3110 validate_sample_rate(options.telemetry.sample_rate)?;
3111
3112 let email_text = workflow_input
3113 .get("email_text")
3114 .and_then(Value::as_str)
3115 .unwrap_or_default();
3116
3117 let diagnostics = verify_yaml_workflow(workflow);
3118 let errors: Vec<YamlWorkflowDiagnostic> = diagnostics
3119 .iter()
3120 .filter(|d| d.severity == YamlWorkflowDiagnosticSeverity::Error)
3121 .cloned()
3122 .collect();
3123 if !errors.is_empty() {
3124 return Err(YamlWorkflowRunError::Validation {
3125 diagnostics_count: errors.len(),
3126 diagnostics: errors,
3127 });
3128 }
3129
3130 if let Some(output) =
3131 try_run_yaml_via_ir_runtime(workflow, workflow_input, executor, custom_worker, options)
3132 .await?
3133 {
3134 return Ok(output);
3135 }
3136
3137 let parent_trace_context = trace_context_from_options(options);
3138 let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
3139
3140 let tracer = workflow_tracer();
3141 let mut workflow_span_context: Option<TraceContext> = None;
3142 let mut workflow_span = if telemetry_context.sampled {
3143 let (span_context, mut span) = tracer.start_span(
3144 "workflow.run",
3145 SpanKind::Workflow,
3146 parent_trace_context.as_ref(),
3147 );
3148 if let Some(trace_id_value) = telemetry_context.trace_id.as_deref() {
3149 span.set_attribute("trace_id", trace_id_value);
3150 }
3151 apply_trace_tenant_attributes(span.as_mut(), options);
3152 workflow_span_context = Some(span_context);
3153 Some(span)
3154 } else {
3155 None
3156 };
3157
3158 if workflow.nodes.is_empty() {
3159 return Err(YamlWorkflowRunError::EmptyNodes {
3160 workflow_id: workflow.id.clone(),
3161 });
3162 }
3163
3164 let node_map: HashMap<&str, &YamlNode> = workflow
3165 .nodes
3166 .iter()
3167 .map(|node| (node.id.as_str(), node))
3168 .collect();
3169 if !node_map.contains_key(workflow.entry_node.as_str()) {
3170 return Err(YamlWorkflowRunError::MissingEntry {
3171 entry_node: workflow.entry_node.clone(),
3172 });
3173 }
3174
3175 let edge_map: HashMap<&str, &str> = workflow
3176 .edges
3177 .iter()
3178 .map(|edge| (edge.from.as_str(), edge.to.as_str()))
3179 .collect();
3180
3181 let mut current = workflow.entry_node.clone();
3182 let mut trace = Vec::new();
3183 let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
3184 let mut globals = serde_json::Map::new();
3185 let mut step_timings = Vec::new();
3186 let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
3187 let mut llm_node_models: BTreeMap<String, String> = BTreeMap::new();
3188 let mut token_totals = YamlTokenTotals::default();
3189 let mut workflow_ttft_ms: Option<u128> = None;
3190 let started = Instant::now();
3191
3192 if let Some(sink) = event_sink {
3193 sink.emit(&YamlWorkflowEvent {
3194 event_type: "workflow_started".to_string(),
3195 node_id: None,
3196 step_id: None,
3197 node_kind: None,
3198 streamable: None,
3199 message: Some(format!("workflow_id={}", workflow.id)),
3200 delta: None,
3201 token_kind: None,
3202 is_terminal_node_token: None,
3203 elapsed_ms: Some(0),
3204 metadata: None,
3205 });
3206 }
3207
3208 if event_sink_is_cancelled(event_sink) {
3209 return Err(YamlWorkflowRunError::EventSinkCancelled {
3210 message: workflow_event_sink_cancelled_message().to_string(),
3211 });
3212 }
3213
3214 loop {
3215 if event_sink_is_cancelled(event_sink) {
3216 return Err(YamlWorkflowRunError::EventSinkCancelled {
3217 message: workflow_event_sink_cancelled_message().to_string(),
3218 });
3219 }
3220
3221 let node =
3222 *node_map
3223 .get(current.as_str())
3224 .ok_or_else(|| YamlWorkflowRunError::MissingNode {
3225 node_id: current.clone(),
3226 })?;
3227
3228 trace.push(node.id.clone());
3229 let step_started = Instant::now();
3230
3231 let mut node_span_context: Option<TraceContext> = None;
3232 let mut node_span = if telemetry_context.sampled {
3233 let (span_context, mut span) = tracer.start_span(
3234 "workflow.node.execute",
3235 SpanKind::Node,
3236 workflow_span_context.as_ref(),
3237 );
3238 node_span_context = Some(span_context);
3239 span.set_attribute(
3240 "trace_id",
3241 telemetry_context.trace_id.as_deref().unwrap_or_default(),
3242 );
3243 span.set_attribute("node_id", node.id.as_str());
3244 span.set_attribute("node_kind", node.kind_name());
3245 Some(span)
3246 } else {
3247 None
3248 };
3249
3250 let node_streamable = node
3251 .node_type
3252 .llm_call
3253 .as_ref()
3254 .map(|llm| llm.stream.unwrap_or(false) && !llm.heal.unwrap_or(false));
3255 let workflow_elapsed_before_node_ms = started.elapsed().as_millis();
3256
3257 if let Some(sink) = event_sink {
3258 sink.emit(&YamlWorkflowEvent {
3259 event_type: "node_started".to_string(),
3260 node_id: Some(node.id.clone()),
3261 step_id: Some(node.id.clone()),
3262 node_kind: Some(node.kind_name().to_string()),
3263 streamable: node_streamable,
3264 message: if node_streamable == Some(false) {
3265 Some("Node is not streamable; status events only".to_string())
3266 } else {
3267 None
3268 },
3269 delta: None,
3270 token_kind: None,
3271 is_terminal_node_token: None,
3272 elapsed_ms: Some(workflow_elapsed_before_node_ms),
3273 metadata: None,
3274 });
3275 }
3276
3277 if event_sink_is_cancelled(event_sink) {
3278 return Err(YamlWorkflowRunError::EventSinkCancelled {
3279 message: workflow_event_sink_cancelled_message().to_string(),
3280 });
3281 }
3282
3283 let mut node_usage: Option<YamlLlmTokenUsage> = None;
3284 let mut node_model_name: Option<String> = None;
3285 let is_terminal_node = !edge_map.contains_key(node.id.as_str());
3286 let next = if let Some(llm) = &node.node_type.llm_call {
3287 let prompt_template = node
3288 .config
3289 .as_ref()
3290 .and_then(|cfg| cfg.prompt.as_deref())
3291 .unwrap_or_default();
3292 let context = json!({
3293 "input": workflow_input,
3294 "nodes": outputs,
3295 "globals": Value::Object(globals.clone())
3296 });
3297 let messages = if let Some(path) = llm.messages_path.as_deref() {
3298 Some(
3299 parse_messages_from_context(path, &context).map_err(|message| {
3300 YamlWorkflowRunError::Llm {
3301 node_id: node.id.clone(),
3302 message,
3303 }
3304 })?,
3305 )
3306 } else {
3307 None
3308 };
3309 let prompt_bindings = collect_template_bindings(prompt_template, &context);
3310 let prompt = interpolate_template(prompt_template, &context);
3311 let schema = llm_output_schema_for_node(node);
3312
3313 let request = YamlLlmExecutionRequest {
3314 node_id: node.id.clone(),
3315 is_terminal_node,
3316 stream_json_as_text: llm.stream_json_as_text.unwrap_or(false),
3317 model: resolve_requested_model(options.model.as_deref(), &llm.model),
3318 messages,
3319 append_prompt_as_user: llm.append_prompt_as_user.unwrap_or(true),
3320 prompt,
3321 prompt_template: prompt_template.to_string(),
3322 prompt_bindings,
3323 schema,
3324 stream: llm.stream.unwrap_or(false),
3325 heal: llm.heal.unwrap_or(false),
3326 tools: normalize_llm_tools(llm).map_err(|message| YamlWorkflowRunError::Llm {
3327 node_id: node.id.clone(),
3328 message,
3329 })?,
3330 tool_choice: normalize_tool_choice(llm.tool_choice.clone()).map_err(|message| {
3331 YamlWorkflowRunError::Llm {
3332 node_id: node.id.clone(),
3333 message,
3334 }
3335 })?,
3336 max_tool_roundtrips: llm.max_tool_roundtrips.unwrap_or(1),
3337 tool_calls_global_key: llm.tool_calls_global_key.clone(),
3338 tool_trace_mode: options.telemetry.tool_trace_mode,
3339 execution_context: context.clone(),
3340 email_text: email_text.to_string(),
3341 trace_id: telemetry_context.trace_id.clone(),
3342 trace_context: node_span_context.clone(),
3343 trace_sampled: telemetry_context.sampled,
3344 };
3345
3346 if let Some(span) = node_span.as_mut() {
3347 span.set_attribute(
3348 "node_input",
3349 payload_for_span(options.telemetry.payload_mode, &context).as_str(),
3350 );
3351 }
3352
3353 if let Some(sink) = event_sink {
3354 sink.emit(&YamlWorkflowEvent {
3355 event_type: "node_llm_input_resolved".to_string(),
3356 node_id: Some(node.id.clone()),
3357 step_id: Some(node.id.clone()),
3358 node_kind: Some("llm_call".to_string()),
3359 streamable: Some(request.stream),
3360 message: Some("resolved llm input for telemetry".to_string()),
3361 delta: None,
3362 token_kind: None,
3363 is_terminal_node_token: None,
3364 elapsed_ms: Some(started.elapsed().as_millis()),
3365 metadata: Some(json!({
3366 "model": request.model.clone(),
3367 "stream_requested": request.stream,
3368 "stream_json_as_text": request.stream_json_as_text,
3369 "heal_requested": request.heal,
3370 "effective_stream": request.stream,
3371 "prompt_template": request.prompt_template.clone(),
3372 "prompt": request.prompt.clone(),
3373 "schema": request.schema.clone(),
3374 "bindings": request.prompt_bindings.clone(),
3375 "tools_count": request.tools.len(),
3376 "max_tool_roundtrips": request.max_tool_roundtrips,
3377 })),
3378 });
3379 }
3380
3381 node_model_name = Some(request.model.clone());
3382 llm_node_models.insert(node.id.clone(), request.model.clone());
3383
3384 if event_sink_is_cancelled(event_sink) {
3385 return Err(YamlWorkflowRunError::EventSinkCancelled {
3386 message: workflow_event_sink_cancelled_message().to_string(),
3387 });
3388 }
3389
3390 let llm_result = executor
3391 .complete_structured(request, event_sink)
3392 .await
3393 .map_err(|message| YamlWorkflowRunError::Llm {
3394 node_id: node.id.clone(),
3395 message,
3396 })?;
3397
3398 if let Some(usage) = llm_result.usage.as_ref() {
3399 token_totals.add_usage(usage);
3400 }
3401 if workflow_ttft_ms.is_none() {
3402 workflow_ttft_ms = llm_result
3403 .ttft_ms
3404 .map(|node_ttft_ms| workflow_elapsed_before_node_ms + node_ttft_ms);
3405 }
3406 node_usage = llm_result.usage;
3407
3408 let payload = llm_result.payload;
3409 let tool_calls = llm_result.tool_calls;
3410
3411 let mut node_output = json!({ "output": payload });
3412 if !tool_calls.is_empty() {
3413 if let Some(output_obj) = node_output.as_object_mut() {
3414 output_obj.insert("tool_calls".to_string(), json!(tool_calls));
3415 }
3416 }
3417 outputs.insert(node.id.clone(), node_output);
3418 if let Some(span) = node_span.as_mut() {
3419 if let Some(output_payload) = outputs.get(node.id.as_str()) {
3420 span.set_attribute(
3421 "node_output",
3422 payload_for_span(options.telemetry.payload_mode, output_payload).as_str(),
3423 );
3424 }
3425 }
3426 apply_set_globals(node, &outputs, workflow_input, &mut globals);
3427 apply_update_globals(node, &outputs, workflow_input, &mut globals);
3428 if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
3429 if let Some(node_tool_calls) = outputs
3430 .get(node.id.as_str())
3431 .and_then(|value| value.get("tool_calls"))
3432 .cloned()
3433 {
3434 globals.insert(global_key.clone(), node_tool_calls);
3435 }
3436 }
3437 edge_map
3438 .get(node.id.as_str())
3439 .map(|value| value.to_string())
3440 } else if let Some(switch) = &node.node_type.switch {
3441 let context = json!({
3442 "input": workflow_input,
3443 "nodes": outputs,
3444 "globals": Value::Object(globals.clone())
3445 });
3446 let mut chosen = Some(switch.default.clone());
3447 for branch in &switch.branches {
3448 if evaluate_switch_condition(branch.condition.as_str(), &context)? {
3449 chosen = Some(branch.target.clone());
3450 break;
3451 }
3452 }
3453 let chosen = chosen.ok_or_else(|| YamlWorkflowRunError::InvalidSwitchTarget {
3454 node_id: node.id.clone(),
3455 })?;
3456 Some(chosen)
3457 } else if let Some(custom) = &node.node_type.custom_worker {
3458 let payload = node
3459 .config
3460 .as_ref()
3461 .and_then(|cfg| cfg.payload.as_ref())
3462 .cloned()
3463 .unwrap_or_else(|| json!({}));
3464 let context = json!({
3465 "input": workflow_input,
3466 "nodes": outputs,
3467 "globals": Value::Object(globals.clone())
3468 });
3469
3470 if let Some(span) = node_span.as_mut() {
3471 span.set_attribute("handler_name", custom.handler.as_str());
3472 span.set_attribute(
3473 "node_input",
3474 payload_for_span(options.telemetry.payload_mode, &payload).as_str(),
3475 );
3476 }
3477
3478 let mut handler_span_context: Option<TraceContext> = None;
3479 let mut handler_span = if telemetry_context.sampled {
3480 let (span_context, mut span) = tracer.start_span(
3481 "handler.invoke",
3482 SpanKind::Node,
3483 workflow_span_context.as_ref(),
3484 );
3485 handler_span_context = Some(span_context);
3486 span.set_attribute(
3487 "trace_id",
3488 telemetry_context.trace_id.as_deref().unwrap_or_default(),
3489 );
3490 span.set_attribute("handler_name", custom.handler.as_str());
3491 apply_trace_tenant_attributes(span.as_mut(), options);
3492 Some(span)
3493 } else {
3494 None
3495 };
3496
3497 let worker_trace_context = merged_trace_context_for_worker(
3498 handler_span_context.as_ref(),
3499 telemetry_context.trace_id.as_deref(),
3500 options,
3501 );
3502 let worker_context = custom_worker_context_with_trace(
3503 &context,
3504 &worker_trace_context,
3505 &options.trace.tenant,
3506 );
3507
3508 let worker_output_result = if let Some(custom_worker_executor) = custom_worker {
3509 custom_worker_executor
3510 .execute(
3511 custom.handler.as_str(),
3512 &payload,
3513 email_text,
3514 &worker_context,
3515 )
3516 .await
3517 .map_err(|message| YamlWorkflowRunError::CustomWorker {
3518 node_id: node.id.clone(),
3519 message,
3520 })
3521 } else {
3522 mock_custom_worker_output(custom.handler.as_str(), &payload)
3523 };
3524
3525 if let Some(span) = handler_span.take() {
3526 span.end();
3527 }
3528
3529 let worker_output = worker_output_result?;
3530
3531 outputs.insert(node.id.clone(), json!({ "output": worker_output }));
3532 if let Some(span) = node_span.as_mut() {
3533 if let Some(output_payload) = outputs.get(node.id.as_str()) {
3534 span.set_attribute(
3535 "node_output",
3536 payload_for_span(options.telemetry.payload_mode, output_payload).as_str(),
3537 );
3538 }
3539 }
3540 apply_set_globals(node, &outputs, workflow_input, &mut globals);
3541 apply_update_globals(node, &outputs, workflow_input, &mut globals);
3542 edge_map
3543 .get(node.id.as_str())
3544 .map(|value| value.to_string())
3545 } else {
3546 return Err(YamlWorkflowRunError::UnsupportedNodeType {
3547 node_id: node.id.clone(),
3548 });
3549 };
3550
3551 let node_kind = node.kind_name().to_string();
3552 let elapsed_ms = step_started.elapsed().as_millis();
3553 step_timings.push(YamlStepTiming {
3554 node_id: node.id.clone(),
3555 node_kind,
3556 model_name: node_model_name,
3557 elapsed_ms,
3558 prompt_tokens: node_usage.as_ref().map(|usage| usage.prompt_tokens),
3559 completion_tokens: node_usage.as_ref().map(|usage| usage.completion_tokens),
3560 total_tokens: node_usage.as_ref().map(|usage| usage.total_tokens),
3561 reasoning_tokens: node_usage.as_ref().and_then(|usage| usage.reasoning_tokens),
3562 tokens_per_second: node_usage
3563 .as_ref()
3564 .map(|usage| completion_tokens_per_second(usage.completion_tokens, elapsed_ms)),
3565 });
3566
3567 if let Some(usage) = node_usage.as_ref() {
3568 llm_node_metrics.insert(
3569 node.id.clone(),
3570 YamlLlmNodeMetrics {
3571 elapsed_ms,
3572 prompt_tokens: usage.prompt_tokens,
3573 completion_tokens: usage.completion_tokens,
3574 total_tokens: usage.total_tokens,
3575 reasoning_tokens: usage.reasoning_tokens,
3576 tokens_per_second: completion_tokens_per_second(
3577 usage.completion_tokens,
3578 elapsed_ms,
3579 ),
3580 },
3581 );
3582 }
3583
3584 if let Some(mut span) = node_span.take() {
3585 span.set_attribute("elapsed_ms", elapsed_ms.to_string().as_str());
3586 span.add_event("node_completed");
3587 span.end();
3588 }
3589
3590 if let Some(sink) = event_sink {
3591 sink.emit(&YamlWorkflowEvent {
3592 event_type: "node_completed".to_string(),
3593 node_id: Some(node.id.clone()),
3594 step_id: Some(node.id.clone()),
3595 node_kind: Some(node.kind_name().to_string()),
3596 streamable: node_streamable,
3597 message: None,
3598 delta: None,
3599 token_kind: None,
3600 is_terminal_node_token: None,
3601 elapsed_ms: Some(elapsed_ms),
3602 metadata: None,
3603 });
3604 }
3605
3606 if event_sink_is_cancelled(event_sink) {
3607 return Err(YamlWorkflowRunError::EventSinkCancelled {
3608 message: workflow_event_sink_cancelled_message().to_string(),
3609 });
3610 }
3611
3612 if let Some(next) = next {
3613 current = next;
3614 continue;
3615 }
3616 break;
3617 }
3618
3619 let terminal_node = trace
3620 .last()
3621 .cloned()
3622 .ok_or_else(|| YamlWorkflowRunError::EmptyNodes {
3623 workflow_id: workflow.id.clone(),
3624 })?;
3625
3626 let terminal_output = outputs
3627 .get(terminal_node.as_str())
3628 .and_then(|value| value.get("output"))
3629 .cloned();
3630
3631 let total_elapsed_ms = started.elapsed().as_millis();
3632 let output = YamlWorkflowRunOutput {
3633 workflow_id: workflow.id.clone(),
3634 entry_node: workflow.entry_node.clone(),
3635 email_text: email_text.to_string(),
3636 trace,
3637 outputs,
3638 terminal_node,
3639 terminal_output,
3640 step_timings,
3641 llm_node_metrics,
3642 llm_node_models,
3643 total_elapsed_ms,
3644 ttft_ms: workflow_ttft_ms,
3645 total_input_tokens: token_totals.input_tokens,
3646 total_output_tokens: token_totals.output_tokens,
3647 total_tokens: token_totals.total_tokens,
3648 total_reasoning_tokens: token_totals.reasoning_tokens,
3649 tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
3650 trace_id: telemetry_context.trace_id.clone(),
3651 metadata: telemetry_context.trace_id.as_ref().map(|value| {
3652 workflow_metadata_with_trace(
3653 options,
3654 value,
3655 telemetry_context.sampled,
3656 telemetry_context.trace_id_source,
3657 )
3658 }),
3659 };
3660
3661 if let Some(sink) = event_sink {
3662 let event_metadata = if options.telemetry.nerdstats {
3663 Some(json!({
3664 "nerdstats": workflow_nerdstats(&output),
3665 }))
3666 } else {
3667 None
3668 };
3669 sink.emit(&YamlWorkflowEvent {
3670 event_type: "workflow_completed".to_string(),
3671 node_id: None,
3672 step_id: None,
3673 node_kind: None,
3674 streamable: None,
3675 message: Some(format!("terminal_node={}", output.terminal_node)),
3676 delta: None,
3677 token_kind: None,
3678 is_terminal_node_token: None,
3679 elapsed_ms: Some(output.total_elapsed_ms),
3680 metadata: event_metadata,
3681 });
3682 }
3683
3684 if event_sink_is_cancelled(event_sink) {
3685 return Err(YamlWorkflowRunError::EventSinkCancelled {
3686 message: workflow_event_sink_cancelled_message().to_string(),
3687 });
3688 }
3689
3690 if let Some(mut span) = workflow_span.take() {
3691 span.set_attribute("workflow_id", workflow.id.as_str());
3692 if let Some(trace_id_value) = telemetry_context.trace_id.as_ref() {
3693 span.set_attribute("trace_id", trace_id_value.as_str());
3694 }
3695 span.end();
3696 flush_workflow_tracer();
3697 }
3698
3699 Ok(output)
3700}
3701
3702async fn try_run_yaml_via_ir_runtime(
3703 workflow: &YamlWorkflow,
3704 workflow_input: &Value,
3705 executor: &dyn YamlWorkflowLlmExecutor,
3706 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
3707 options: &YamlWorkflowRunOptions,
3708) -> Result<Option<YamlWorkflowRunOutput>, YamlWorkflowRunError> {
3709 let ir = match yaml_workflow_to_ir(workflow) {
3710 Ok(def) => def,
3711 Err(YamlToIrError::UnsupportedNode { .. })
3712 | Err(YamlToIrError::MultipleOutgoingEdge { .. }) => return Ok(None),
3713 Err(err) => {
3714 return Err(YamlWorkflowRunError::InvalidInput {
3715 message: err.to_string(),
3716 });
3717 }
3718 };
3719
3720 let parent_trace_context = trace_context_from_options(options);
3721 let telemetry_context = resolve_telemetry_context(options, parent_trace_context.as_ref());
3722
3723 let tracer = workflow_tracer();
3724 let mut workflow_span_context: Option<TraceContext> = None;
3725 let mut workflow_span = if telemetry_context.sampled {
3726 let (span_context, mut span) = tracer.start_span(
3727 "workflow.run",
3728 SpanKind::Workflow,
3729 parent_trace_context.as_ref(),
3730 );
3731 if let Some(trace_id_value) = telemetry_context.trace_id.as_deref() {
3732 span.set_attribute("trace_id", trace_id_value);
3733 }
3734 apply_trace_tenant_attributes(span.as_mut(), options);
3735 workflow_span_context = Some(span_context);
3736 Some(span)
3737 } else {
3738 None
3739 };
3740
3741 struct NoopLlm;
3742 #[async_trait]
3743 impl LlmExecutor for NoopLlm {
3744 async fn execute(
3745 &self,
3746 _input: LlmExecutionInput,
3747 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
3748 Err(LlmExecutionError::UnexpectedOutcome(
3749 "yaml_ir_uses_tool_path",
3750 ))
3751 }
3752 }
3753
3754 struct YamlIrToolExecutor<'a> {
3755 llm_executor: &'a dyn YamlWorkflowLlmExecutor,
3756 custom_worker: Option<&'a dyn YamlWorkflowCustomWorkerExecutor>,
3757 token_totals: std::sync::Mutex<YamlTokenTotals>,
3758 node_usage: std::sync::Mutex<BTreeMap<String, YamlLlmTokenUsage>>,
3759 node_models: std::sync::Mutex<BTreeMap<String, String>>,
3760 model_override: Option<String>,
3761 trace_id: Option<String>,
3762 trace_context: Option<TraceContext>,
3763 trace_input_context: Option<YamlWorkflowTraceContextInput>,
3764 tenant_context: YamlWorkflowTraceTenantContext,
3765 payload_mode: YamlWorkflowPayloadMode,
3766 trace_sampled: bool,
3767 }
3768
3769 #[async_trait]
3770 impl ToolExecutor for YamlIrToolExecutor<'_> {
3771 async fn execute_tool(
3772 &self,
3773 input: ToolExecutionInput,
3774 ) -> Result<Value, ToolExecutionError> {
3775 let context = build_yaml_context_from_ir_scope(&input.scoped_input);
3776
3777 if input.tool == YAML_LLM_TOOL_ID {
3778 let node_id = input
3779 .input
3780 .get("node_id")
3781 .and_then(Value::as_str)
3782 .ok_or_else(|| {
3783 ToolExecutionError::Failed("yaml llm call missing node_id".to_string())
3784 })?
3785 .to_string();
3786 let node_id_for_metrics = node_id.clone();
3787 let model = input
3788 .input
3789 .get("model")
3790 .and_then(Value::as_str)
3791 .ok_or_else(|| {
3792 ToolExecutionError::Failed("yaml llm call missing model".to_string())
3793 })?
3794 .to_string();
3795 let resolved_model =
3796 resolve_requested_model(self.model_override.as_deref(), &model);
3797 let prompt_template = input
3798 .input
3799 .get("prompt_template")
3800 .and_then(Value::as_str)
3801 .unwrap_or_default()
3802 .to_string();
3803 let stream = input
3804 .input
3805 .get("stream")
3806 .and_then(Value::as_bool)
3807 .unwrap_or(false);
3808 let heal = input
3809 .input
3810 .get("heal")
3811 .and_then(Value::as_bool)
3812 .unwrap_or(false);
3813 let append_prompt_as_user = input
3814 .input
3815 .get("append_prompt_as_user")
3816 .and_then(Value::as_bool)
3817 .unwrap_or(true);
3818 let messages_path = input
3819 .input
3820 .get("messages_path")
3821 .and_then(Value::as_str)
3822 .map(str::to_string);
3823
3824 let messages = if let Some(path) = messages_path.as_deref() {
3825 Some(
3826 parse_messages_from_context(path, &context)
3827 .map_err(ToolExecutionError::Failed)?,
3828 )
3829 } else {
3830 None
3831 };
3832
3833 let prompt_bindings = collect_template_bindings(&prompt_template, &context);
3834 let prompt = interpolate_template(&prompt_template, &context);
3835 let email_text = context
3836 .get("input")
3837 .and_then(|v| v.get("email_text"))
3838 .and_then(Value::as_str)
3839 .unwrap_or_default();
3840 let schema = input
3841 .input
3842 .get("output_schema")
3843 .cloned()
3844 .unwrap_or_else(default_llm_output_schema);
3845
3846 let request = YamlLlmExecutionRequest {
3847 node_id,
3848 is_terminal_node: false,
3849 stream_json_as_text: input
3850 .input
3851 .get("stream_json_as_text")
3852 .and_then(Value::as_bool)
3853 .unwrap_or(false),
3854 model: resolved_model.clone(),
3855 messages,
3856 append_prompt_as_user,
3857 prompt,
3858 prompt_template,
3859 prompt_bindings,
3860 schema,
3861 stream,
3862 heal,
3863 tools: Vec::new(),
3864 tool_choice: None,
3865 max_tool_roundtrips: 1,
3866 tool_calls_global_key: None,
3867 tool_trace_mode: YamlToolTraceMode::Off,
3868 execution_context: context.clone(),
3869 email_text: email_text.to_string(),
3870 trace_id: self.trace_id.clone(),
3871 trace_context: self.trace_context.clone(),
3872 trace_sampled: self.trace_sampled,
3873 };
3874
3875 let llm_result = self
3876 .llm_executor
3877 .complete_structured(request, None)
3878 .await
3879 .map_err(ToolExecutionError::Failed);
3880
3881 if let Ok(ref result) = llm_result {
3882 if let Some(usage) = result.usage.as_ref() {
3883 if let Ok(mut totals) = self.token_totals.lock() {
3884 totals.add_usage(usage);
3885 }
3886 if let Ok(mut usage_map) = self.node_usage.lock() {
3887 usage_map.insert(node_id_for_metrics.clone(), usage.clone());
3888 }
3889 }
3890 if let Ok(mut model_map) = self.node_models.lock() {
3891 model_map.insert(node_id_for_metrics, resolved_model);
3892 }
3893 }
3894
3895 return llm_result.map(|result| result.payload);
3896 }
3897
3898 let worker = self
3899 .custom_worker
3900 .ok_or_else(|| ToolExecutionError::NotFound {
3901 tool: input.tool.clone(),
3902 })?;
3903
3904 let payload = input.input.clone();
3905 let email_text = context
3906 .get("input")
3907 .and_then(|v| v.get("email_text"))
3908 .and_then(Value::as_str)
3909 .unwrap_or_default();
3910
3911 let tracer = workflow_tracer();
3912 let mut handler_span_context: Option<TraceContext> = None;
3913 let mut handler_span = if self.trace_sampled {
3914 let (span_context, mut span) = tracer.start_span(
3915 "handler.invoke",
3916 SpanKind::Node,
3917 self.trace_context.as_ref(),
3918 );
3919 handler_span_context = Some(span_context);
3920 if let Some(trace_id) = self.trace_id.as_ref() {
3921 span.set_attribute("trace_id", trace_id.as_str());
3922 }
3923 span.set_attribute("handler_name", input.tool.as_str());
3924 if let Some(workspace_id) = self.tenant_context.workspace_id.as_deref() {
3925 span.set_attribute("tenant.workspace_id", workspace_id);
3926 }
3927 if let Some(user_id) = self.tenant_context.user_id.as_deref() {
3928 span.set_attribute("tenant.user_id", user_id);
3929 }
3930 if let Some(conversation_id) = self.tenant_context.conversation_id.as_deref() {
3931 span.set_attribute("tenant.conversation_id", conversation_id);
3932 }
3933 if let Some(request_id) = self.tenant_context.request_id.as_deref() {
3934 span.set_attribute("tenant.request_id", request_id);
3935 }
3936 if let Some(run_id) = self.tenant_context.run_id.as_deref() {
3937 span.set_attribute("tenant.run_id", run_id);
3938 }
3939 span.set_attribute(
3940 "node_input",
3941 payload_for_span(self.payload_mode, &payload).as_str(),
3942 );
3943 Some(span)
3944 } else {
3945 None
3946 };
3947
3948 let trace_options = YamlWorkflowRunOptions {
3949 telemetry: YamlWorkflowTelemetryConfig::default(),
3950 trace: YamlWorkflowTraceOptions {
3951 context: self.trace_input_context.clone(),
3952 tenant: self.tenant_context.clone(),
3953 },
3954 model: None,
3955 };
3956 let worker_trace_context = merged_trace_context_for_worker(
3957 handler_span_context.as_ref(),
3958 self.trace_id.as_deref(),
3959 &trace_options,
3960 );
3961 let worker_context = custom_worker_context_with_trace(
3962 &context,
3963 &worker_trace_context,
3964 &self.tenant_context,
3965 );
3966
3967 let output_result = worker
3968 .execute(&input.tool, &payload, email_text, &worker_context)
3969 .await
3970 .map_err(ToolExecutionError::Failed);
3971
3972 if let Some(span) = handler_span.as_mut() {
3973 if output_result.is_ok() {
3974 span.add_event("handler.success");
3975 } else {
3976 span.add_event("handler.error");
3977 }
3978 }
3979
3980 if let Some(span) = handler_span.take() {
3981 span.end();
3982 }
3983
3984 output_result
3985 }
3986 }
3987
3988 let tool_executor = YamlIrToolExecutor {
3989 llm_executor: executor,
3990 custom_worker,
3991 token_totals: std::sync::Mutex::new(YamlTokenTotals::default()),
3992 node_usage: std::sync::Mutex::new(BTreeMap::new()),
3993 node_models: std::sync::Mutex::new(BTreeMap::new()),
3994 model_override: options.model.clone(),
3995 trace_id: telemetry_context.trace_id.clone(),
3996 trace_context: workflow_span_context.clone(),
3997 trace_input_context: options.trace.context.clone(),
3998 tenant_context: options.trace.tenant.clone(),
3999 payload_mode: options.telemetry.payload_mode,
4000 trace_sampled: telemetry_context.sampled,
4001 };
4002
4003 let runtime = WorkflowRuntime::new(
4004 ir,
4005 &NoopLlm,
4006 Some(&tool_executor),
4007 WorkflowRuntimeOptions::default(),
4008 );
4009
4010 let started = Instant::now();
4011 let result = match runtime.execute(workflow_input.clone(), None).await {
4012 Ok(result) => result,
4013 Err(WorkflowRuntimeError::Validation(_)) => return Ok(None),
4014 Err(error) => {
4015 return Err(YamlWorkflowRunError::IrRuntime {
4016 message: error.to_string(),
4017 });
4018 }
4019 };
4020 let total_elapsed_ms = started.elapsed().as_millis();
4021
4022 let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
4023 for (node_id, output) in result.node_outputs {
4024 if node_id == YAML_START_NODE_ID {
4025 continue;
4026 }
4027 outputs.insert(node_id, json!({"output": output}));
4028 }
4029
4030 let mut trace = Vec::new();
4031 let mut step_timings = Vec::new();
4032 let node_usage_map = tool_executor
4033 .node_usage
4034 .lock()
4035 .map(|usage| usage.clone())
4036 .unwrap_or_default();
4037 let llm_node_models = tool_executor
4038 .node_models
4039 .lock()
4040 .map(|models| models.clone())
4041 .unwrap_or_default();
4042 let mut llm_node_metrics: BTreeMap<String, YamlLlmNodeMetrics> = BTreeMap::new();
4043 for execution in result.node_executions {
4044 if execution.node_id == YAML_START_NODE_ID {
4045 continue;
4046 }
4047 let node_id = execution.node_id;
4048 trace.push(node_id.clone());
4049 let usage = node_usage_map.get(&node_id);
4050 if let Some(usage) = usage {
4051 llm_node_metrics.insert(
4052 node_id.clone(),
4053 YamlLlmNodeMetrics {
4054 elapsed_ms: 0,
4055 prompt_tokens: usage.prompt_tokens,
4056 completion_tokens: usage.completion_tokens,
4057 total_tokens: usage.total_tokens,
4058 reasoning_tokens: usage.reasoning_tokens,
4059 tokens_per_second: completion_tokens_per_second(usage.completion_tokens, 0),
4060 },
4061 );
4062 }
4063 step_timings.push(YamlStepTiming {
4064 node_id: node_id.clone(),
4065 node_kind: "ir_runtime".to_string(),
4066 model_name: llm_node_models.get(&node_id).cloned(),
4067 elapsed_ms: 0,
4068 prompt_tokens: usage.map(|value| value.prompt_tokens),
4069 completion_tokens: usage.map(|value| value.completion_tokens),
4070 total_tokens: usage.map(|value| value.total_tokens),
4071 reasoning_tokens: usage.and_then(|value| value.reasoning_tokens),
4072 tokens_per_second: usage
4073 .map(|value| completion_tokens_per_second(value.completion_tokens, 0)),
4074 });
4075 }
4076
4077 let terminal_node = result.terminal_node_id;
4078 let terminal_output = outputs
4079 .get(&terminal_node)
4080 .and_then(|v| v.get("output"))
4081 .cloned();
4082
4083 let email_text = workflow_input
4084 .get("email_text")
4085 .and_then(Value::as_str)
4086 .unwrap_or_default()
4087 .to_string();
4088
4089 let token_totals = tool_executor
4090 .token_totals
4091 .lock()
4092 .map(|totals| totals.clone())
4093 .unwrap_or_default();
4094
4095 if let Some(mut span) = workflow_span.take() {
4096 span.set_attribute("workflow_id", workflow.id.as_str());
4097 if let Some(trace_id_value) = telemetry_context.trace_id.as_ref() {
4098 span.set_attribute("trace_id", trace_id_value.as_str());
4099 }
4100 span.end();
4101 flush_workflow_tracer();
4102 }
4103
4104 Ok(Some(YamlWorkflowRunOutput {
4105 workflow_id: workflow.id.clone(),
4106 entry_node: workflow.entry_node.clone(),
4107 email_text,
4108 trace,
4109 outputs,
4110 terminal_node,
4111 terminal_output,
4112 step_timings,
4113 llm_node_metrics,
4114 llm_node_models,
4115 total_elapsed_ms,
4116 ttft_ms: None,
4117 total_input_tokens: token_totals.input_tokens,
4118 total_output_tokens: token_totals.output_tokens,
4119 total_tokens: token_totals.total_tokens,
4120 total_reasoning_tokens: token_totals.reasoning_tokens,
4121 tokens_per_second: token_totals.tokens_per_second(total_elapsed_ms),
4122 trace_id: telemetry_context.trace_id.clone(),
4123 metadata: telemetry_context.trace_id.as_ref().map(|value| {
4124 workflow_metadata_with_trace(
4125 options,
4126 value,
4127 telemetry_context.sampled,
4128 telemetry_context.trace_id_source,
4129 )
4130 }),
4131 }))
4132}
4133
4134fn build_yaml_context_from_ir_scope(scoped_input: &Value) -> Value {
4135 let input = scoped_input.get("input").cloned().unwrap_or(Value::Null);
4136
4137 let mut nodes = serde_json::Map::new();
4138 if let Some(node_outputs) = scoped_input.get("node_outputs").and_then(Value::as_object) {
4139 for (node_id, output) in node_outputs {
4140 nodes.insert(node_id.clone(), json!({"output": output.clone()}));
4141 }
4142 }
4143
4144 json!({
4145 "input": input,
4146 "nodes": Value::Object(nodes),
4147 "globals": Value::Object(serde_json::Map::new())
4148 })
4149}
4150
4151pub async fn run_email_workflow_yaml_with_custom_worker_and_events(
4152 workflow: &YamlWorkflow,
4153 email_text: &str,
4154 executor: &dyn YamlWorkflowLlmExecutor,
4155 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
4156 event_sink: Option<&dyn YamlWorkflowEventSink>,
4157) -> Result<YamlWorkflowRunOutput, YamlWorkflowRunError> {
4158 let workflow_input = json!({ "email_text": email_text });
4159 run_workflow_yaml_with_custom_worker_and_events(
4160 workflow,
4161 &workflow_input,
4162 executor,
4163 custom_worker,
4164 event_sink,
4165 )
4166 .await
4167}
4168
4169fn evaluate_switch_condition(
4170 condition: &str,
4171 context: &Value,
4172) -> Result<bool, YamlWorkflowRunError> {
4173 let (left, right) =
4174 condition
4175 .split_once("==")
4176 .ok_or_else(|| YamlWorkflowRunError::UnsupportedCondition {
4177 condition: condition.to_string(),
4178 })?;
4179
4180 let left_path = left.trim().trim_start_matches("$.");
4181 let right_literal = right.trim().trim_matches('"').trim_matches('\'');
4182 let left_value = resolve_path(context, left_path);
4183 Ok(left_value
4184 .and_then(Value::as_str)
4185 .map(|value| value == right_literal)
4186 .unwrap_or(false))
4187}
4188
4189fn parse_messages_from_context(path: &str, context: &Value) -> Result<Vec<Message>, String> {
4190 let normalized_path = path.trim().trim_start_matches("$.");
4191 let value = resolve_path(context, normalized_path)
4192 .ok_or_else(|| format!("messages_path not found: {path}"))?;
4193 let list: Vec<WorkflowMessage> = serde_json::from_value(value.clone()).map_err(|err| {
4194 format!("messages_path must resolve to a list of messages: {path}; {err}")
4195 })?;
4196 if list.is_empty() {
4197 return Err(format!(
4198 "messages_path must not resolve to an empty list: {path}"
4199 ));
4200 }
4201
4202 let mut messages = Vec::with_capacity(list.len());
4203 for (index, item) in list.into_iter().enumerate() {
4204 let mut message = match item.role {
4205 Role::System => Message::system(item.content),
4206 Role::User => Message::user(item.content),
4207 Role::Assistant => Message::assistant(item.content),
4208 Role::Tool => {
4209 let tool_call_id = item
4210 .tool_call_id
4211 .ok_or_else(|| format!("tool message at index {index} missing tool_call_id"))?;
4212 Message::tool(item.content, tool_call_id)
4213 }
4214 };
4215
4216 if let Some(name) = item.name {
4217 message = message.with_name(name);
4218 }
4219
4220 messages.push(message);
4221 }
4222
4223 Ok(messages)
4224}
4225
4226pub fn verify_yaml_workflow(workflow: &YamlWorkflow) -> Vec<YamlWorkflowDiagnostic> {
4227 let mut diagnostics = Vec::new();
4228 let known_ids: HashMap<&str, &YamlNode> = workflow
4229 .nodes
4230 .iter()
4231 .map(|node| (node.id.as_str(), node))
4232 .collect();
4233
4234 if !known_ids.contains_key(workflow.entry_node.as_str()) {
4235 diagnostics.push(YamlWorkflowDiagnostic {
4236 node_id: None,
4237 code: "missing_entry".to_string(),
4238 severity: YamlWorkflowDiagnosticSeverity::Error,
4239 message: format!("entry node '{}' does not exist", workflow.entry_node),
4240 });
4241 }
4242
4243 for edge in &workflow.edges {
4244 if !known_ids.contains_key(edge.from.as_str()) {
4245 diagnostics.push(YamlWorkflowDiagnostic {
4246 node_id: Some(edge.from.clone()),
4247 code: "unknown_edge_from".to_string(),
4248 severity: YamlWorkflowDiagnosticSeverity::Error,
4249 message: format!("edge.from '{}' does not exist", edge.from),
4250 });
4251 }
4252 if !known_ids.contains_key(edge.to.as_str()) {
4253 diagnostics.push(YamlWorkflowDiagnostic {
4254 node_id: Some(edge.to.clone()),
4255 code: "unknown_edge_to".to_string(),
4256 severity: YamlWorkflowDiagnosticSeverity::Error,
4257 message: format!("edge.to '{}' does not exist", edge.to),
4258 });
4259 }
4260 }
4261
4262 for node in &workflow.nodes {
4263 if let Some(llm) = &node.node_type.llm_call {
4264 if llm.model.trim().is_empty() {
4265 diagnostics.push(YamlWorkflowDiagnostic {
4266 node_id: Some(node.id.clone()),
4267 code: "empty_model".to_string(),
4268 severity: YamlWorkflowDiagnosticSeverity::Error,
4269 message: "llm_call.model must not be empty".to_string(),
4270 });
4271 }
4272 if llm.stream.unwrap_or(false) && llm.heal.unwrap_or(false) {
4273 diagnostics.push(YamlWorkflowDiagnostic {
4274 node_id: Some(node.id.clone()),
4275 code: "stream_heal_conflict".to_string(),
4276 severity: YamlWorkflowDiagnosticSeverity::Warning,
4277 message:
4278 "llm_call.stream=true with heal=true is not streamable; runtime will disable streaming"
4279 .to_string(),
4280 });
4281 }
4282
4283 if llm.max_tool_roundtrips.unwrap_or(1) == 0 {
4284 diagnostics.push(YamlWorkflowDiagnostic {
4285 node_id: Some(node.id.clone()),
4286 code: "invalid_max_tool_roundtrips".to_string(),
4287 severity: YamlWorkflowDiagnosticSeverity::Error,
4288 message: "llm_call.max_tool_roundtrips must be >= 1".to_string(),
4289 });
4290 }
4291
4292 if let Some(global_key) = llm.tool_calls_global_key.as_ref() {
4293 if global_key.trim().is_empty() {
4294 diagnostics.push(YamlWorkflowDiagnostic {
4295 node_id: Some(node.id.clone()),
4296 code: "empty_tool_calls_global_key".to_string(),
4297 severity: YamlWorkflowDiagnosticSeverity::Error,
4298 message: "llm_call.tool_calls_global_key must not be empty".to_string(),
4299 });
4300 }
4301 }
4302
4303 match normalize_tool_choice(llm.tool_choice.clone()) {
4304 Ok(choice) => {
4305 if let Some(ToolChoice::Tool(choice_tool)) = choice.as_ref() {
4306 if !llm.tools.iter().any(|tool| match (llm.tools_format, tool) {
4307 (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
4308 openai.function.name == choice_tool.function.name
4309 }
4310 (
4311 YamlToolFormat::Simplified,
4312 YamlToolDeclaration::Simplified(simple),
4313 ) => simple.name == choice_tool.function.name,
4314 _ => false,
4315 }) {
4316 diagnostics.push(YamlWorkflowDiagnostic {
4317 node_id: Some(node.id.clone()),
4318 code: "unknown_tool_choice_function".to_string(),
4319 severity: YamlWorkflowDiagnosticSeverity::Error,
4320 message: format!(
4321 "llm_call.tool_choice references unknown function '{}'",
4322 choice_tool.function.name
4323 ),
4324 });
4325 }
4326 }
4327 }
4328 Err(message) => {
4329 diagnostics.push(YamlWorkflowDiagnostic {
4330 node_id: Some(node.id.clone()),
4331 code: "invalid_tool_choice".to_string(),
4332 severity: YamlWorkflowDiagnosticSeverity::Error,
4333 message,
4334 });
4335 }
4336 }
4337
4338 let normalized_tools = match normalize_llm_tools(llm) {
4339 Ok(tools) => tools,
4340 Err(message) => {
4341 diagnostics.push(YamlWorkflowDiagnostic {
4342 node_id: Some(node.id.clone()),
4343 code: "invalid_tools_format".to_string(),
4344 severity: YamlWorkflowDiagnosticSeverity::Error,
4345 message,
4346 });
4347 Vec::new()
4348 }
4349 };
4350
4351 let mut seen_tool_names = HashSet::new();
4352 for tool in &normalized_tools {
4353 let name = tool.definition.function.name.trim();
4354 if name.is_empty() {
4355 diagnostics.push(YamlWorkflowDiagnostic {
4356 node_id: Some(node.id.clone()),
4357 code: "empty_tool_name".to_string(),
4358 severity: YamlWorkflowDiagnosticSeverity::Error,
4359 message: "tool function name must not be empty".to_string(),
4360 });
4361 }
4362 if !seen_tool_names.insert(tool.definition.function.name.clone()) {
4363 diagnostics.push(YamlWorkflowDiagnostic {
4364 node_id: Some(node.id.clone()),
4365 code: "duplicate_tool_name".to_string(),
4366 severity: YamlWorkflowDiagnosticSeverity::Error,
4367 message: format!(
4368 "duplicate tool function name '{}' in node",
4369 tool.definition.function.name
4370 ),
4371 });
4372 }
4373
4374 let schema = tool
4375 .definition
4376 .function
4377 .parameters
4378 .clone()
4379 .unwrap_or(Value::Null);
4380 if schema.is_null() {
4381 diagnostics.push(YamlWorkflowDiagnostic {
4382 node_id: Some(node.id.clone()),
4383 code: "missing_tool_input_schema".to_string(),
4384 severity: YamlWorkflowDiagnosticSeverity::Error,
4385 message: format!(
4386 "tool '{}' is missing input schema",
4387 tool.definition.function.name
4388 ),
4389 });
4390 } else if let Err(message) = validate_json_schema(&schema) {
4391 diagnostics.push(YamlWorkflowDiagnostic {
4392 node_id: Some(node.id.clone()),
4393 code: "invalid_tool_input_schema".to_string(),
4394 severity: YamlWorkflowDiagnosticSeverity::Error,
4395 message: format!(
4396 "tool '{}' has invalid input schema: {}",
4397 tool.definition.function.name, message
4398 ),
4399 });
4400 }
4401
4402 if let Some(output_schema) = tool.output_schema.as_ref() {
4403 if let Err(message) = validate_json_schema(output_schema) {
4404 diagnostics.push(YamlWorkflowDiagnostic {
4405 node_id: Some(node.id.clone()),
4406 code: "invalid_tool_output_schema".to_string(),
4407 severity: YamlWorkflowDiagnosticSeverity::Error,
4408 message: format!(
4409 "tool '{}' has invalid output schema: {}",
4410 tool.definition.function.name, message
4411 ),
4412 });
4413 }
4414 }
4415 }
4416 }
4417
4418 if let Some(switch) = &node.node_type.switch {
4419 for branch in &switch.branches {
4420 if !known_ids.contains_key(branch.target.as_str()) {
4421 diagnostics.push(YamlWorkflowDiagnostic {
4422 node_id: Some(node.id.clone()),
4423 code: "unknown_switch_target".to_string(),
4424 severity: YamlWorkflowDiagnosticSeverity::Error,
4425 message: format!("switch branch target '{}' does not exist", branch.target),
4426 });
4427 }
4428 }
4429 if !known_ids.contains_key(switch.default.as_str()) {
4430 diagnostics.push(YamlWorkflowDiagnostic {
4431 node_id: Some(node.id.clone()),
4432 code: "unknown_switch_default".to_string(),
4433 severity: YamlWorkflowDiagnosticSeverity::Error,
4434 message: format!("switch default target '{}' does not exist", switch.default),
4435 });
4436 }
4437 }
4438
4439 if let Some(config) = node.config.as_ref() {
4440 if let Some(update_globals) = config.update_globals.as_ref() {
4441 for (key, update) in update_globals {
4442 let is_valid_op =
4443 matches!(update.op.as_str(), "set" | "append" | "increment" | "merge");
4444 if !is_valid_op {
4445 diagnostics.push(YamlWorkflowDiagnostic {
4446 node_id: Some(node.id.clone()),
4447 code: "unknown_update_op".to_string(),
4448 severity: YamlWorkflowDiagnosticSeverity::Error,
4449 message: format!(
4450 "update_globals key '{}' has unknown op '{}'; expected set|append|increment|merge",
4451 key, update.op
4452 ),
4453 });
4454 }
4455
4456 if update.op != "increment" && update.from.is_none() {
4457 diagnostics.push(YamlWorkflowDiagnostic {
4458 node_id: Some(node.id.clone()),
4459 code: "missing_update_from".to_string(),
4460 severity: YamlWorkflowDiagnosticSeverity::Error,
4461 message: format!(
4462 "update_globals key '{}' with op '{}' requires 'from'",
4463 key, update.op
4464 ),
4465 });
4466 }
4467 }
4468 }
4469 }
4470 }
4471
4472 diagnostics
4473}
4474
4475fn resolve_path<'a>(value: &'a Value, path: &str) -> Option<&'a Value> {
4476 path.split('.')
4477 .filter(|segment| !segment.is_empty())
4478 .try_fold(value, |current, segment| {
4479 if let Ok(index) = segment.parse::<usize>() {
4480 return current.get(index);
4481 }
4482 current.get(segment)
4483 })
4484}
4485
4486fn interpolate_template(template: &str, context: &Value) -> String {
4487 let mut out = String::with_capacity(template.len());
4488 let mut rest = template;
4489
4490 loop {
4491 let Some(start) = rest.find("{{") else {
4492 out.push_str(rest);
4493 break;
4494 };
4495
4496 out.push_str(&rest[..start]);
4497 let after_start = &rest[start + 2..];
4498 let Some(end) = after_start.find("}}") else {
4499 out.push_str(&rest[start..]);
4500 break;
4501 };
4502
4503 let expr = after_start[..end].trim();
4504 let source_path = expr.trim_start_matches("$.");
4505 let replacement = resolve_path(context, source_path)
4506 .map(value_to_template_string)
4507 .unwrap_or_default();
4508 out.push_str(replacement.as_str());
4509
4510 rest = &after_start[end + 2..];
4511 }
4512
4513 out
4514}
4515
4516fn collect_template_bindings(template: &str, context: &Value) -> Vec<YamlTemplateBinding> {
4517 let mut bindings = Vec::new();
4518 let mut rest = template;
4519
4520 loop {
4521 let Some(start) = rest.find("{{") else {
4522 break;
4523 };
4524
4525 let after_start = &rest[start + 2..];
4526 let Some(end) = after_start.find("}}") else {
4527 break;
4528 };
4529
4530 let expr = after_start[..end].trim();
4531 let source_path = expr.trim_start_matches("$.").to_string();
4532 let resolved = resolve_path(context, source_path.as_str()).cloned();
4533 let missing = resolved.is_none();
4534 let resolved_value = resolved.unwrap_or(Value::Null);
4535 bindings.push(YamlTemplateBinding {
4536 index: bindings.len(),
4537 expression: expr.to_string(),
4538 source_path,
4539 resolved_type: json_type_name(&resolved_value).to_string(),
4540 missing,
4541 resolved: resolved_value,
4542 });
4543
4544 rest = &after_start[end + 2..];
4545 }
4546
4547 bindings
4548}
4549
4550fn json_type_name(value: &Value) -> &'static str {
4551 match value {
4552 Value::Null => "null",
4553 Value::Bool(_) => "bool",
4554 Value::Number(_) => "number",
4555 Value::String(_) => "string",
4556 Value::Array(_) => "array",
4557 Value::Object(_) => "object",
4558 }
4559}
4560
4561fn value_to_template_string(value: &Value) -> String {
4562 match value {
4563 Value::Null => String::new(),
4564 Value::Bool(v) => v.to_string(),
4565 Value::Number(v) => v.to_string(),
4566 Value::String(v) => v.clone(),
4567 Value::Array(_) | Value::Object(_) => serde_json::to_string(value).unwrap_or_default(),
4568 }
4569}
4570
4571fn apply_set_globals(
4572 node: &YamlNode,
4573 outputs: &BTreeMap<String, Value>,
4574 workflow_input: &Value,
4575 globals: &mut serde_json::Map<String, Value>,
4576) {
4577 let Some(config) = node.config.as_ref() else {
4578 return;
4579 };
4580 let Some(set_globals) = config.set_globals.as_ref() else {
4581 return;
4582 };
4583
4584 let context = json!({
4585 "input": workflow_input,
4586 "nodes": outputs,
4587 "globals": Value::Object(globals.clone())
4588 });
4589
4590 for (key, expr) in set_globals {
4591 let value = resolve_path(&context, expr.as_str())
4592 .cloned()
4593 .unwrap_or(Value::Null);
4594 globals.insert(key.clone(), value);
4595 }
4596}
4597
4598fn apply_update_globals(
4599 node: &YamlNode,
4600 outputs: &BTreeMap<String, Value>,
4601 workflow_input: &Value,
4602 globals: &mut serde_json::Map<String, Value>,
4603) {
4604 let Some(config) = node.config.as_ref() else {
4605 return;
4606 };
4607 let Some(update_globals) = config.update_globals.as_ref() else {
4608 return;
4609 };
4610
4611 let context = json!({
4612 "input": workflow_input,
4613 "nodes": outputs,
4614 "globals": Value::Object(globals.clone())
4615 });
4616
4617 for (key, update) in update_globals {
4618 match update.op.as_str() {
4619 "set" => {
4620 if let Some(path) = update.from.as_ref() {
4621 let value = resolve_path(&context, path.as_str())
4622 .cloned()
4623 .unwrap_or(Value::Null);
4624 globals.insert(key.clone(), value);
4625 }
4626 }
4627 "append" => {
4628 if let Some(path) = update.from.as_ref() {
4629 let value = resolve_path(&context, path.as_str())
4630 .cloned()
4631 .unwrap_or(Value::Null);
4632 let entry = globals
4633 .entry(key.clone())
4634 .or_insert_with(|| Value::Array(Vec::new()));
4635 match entry {
4636 Value::Array(items) => items.push(value),
4637 other => {
4638 let existing = other.clone();
4639 *other = Value::Array(vec![existing, value]);
4640 }
4641 }
4642 }
4643 }
4644 "increment" => {
4645 let by = update.by.unwrap_or(1.0);
4646 let current = globals
4647 .get(key.as_str())
4648 .and_then(Value::as_f64)
4649 .unwrap_or(0.0);
4650 if let Some(next) = serde_json::Number::from_f64(current + by) {
4651 globals.insert(key.clone(), Value::Number(next));
4652 }
4653 }
4654 "merge" => {
4655 if let Some(path) = update.from.as_ref() {
4656 let source = resolve_path(&context, path.as_str())
4657 .cloned()
4658 .unwrap_or(Value::Null);
4659 if let Value::Object(source_map) = source {
4660 let target = globals
4661 .entry(key.clone())
4662 .or_insert_with(|| Value::Object(serde_json::Map::new()));
4663 if let Value::Object(target_map) = target {
4664 target_map.extend(source_map);
4665 } else {
4666 *target = Value::Object(source_map);
4667 }
4668 }
4669 }
4670 }
4671 _ => {}
4672 }
4673 }
4674}
4675
4676fn llm_output_schema_for_node(node: &YamlNode) -> Value {
4677 if let Some(schema) = node
4678 .config
4679 .as_ref()
4680 .and_then(|cfg| cfg.output_schema.clone())
4681 {
4682 return schema;
4683 }
4684
4685 default_llm_output_schema()
4686}
4687
4688fn normalize_tool_choice(
4689 config: Option<YamlToolChoiceConfig>,
4690) -> Result<Option<ToolChoice>, String> {
4691 let Some(config) = config else {
4692 return Ok(None);
4693 };
4694
4695 let choice = match config {
4696 YamlToolChoiceConfig::Mode(mode) => ToolChoice::Mode(mode),
4697 YamlToolChoiceConfig::Function { function } => ToolChoice::Tool(ToolChoiceTool {
4698 tool_type: ToolType::Function,
4699 function: ToolChoiceFunction { name: function },
4700 }),
4701 YamlToolChoiceConfig::OpenAi(tool) => ToolChoice::Tool(tool),
4702 };
4703
4704 Ok(Some(choice))
4705}
4706
4707fn normalize_llm_tools(llm: &YamlLlmCall) -> Result<Vec<YamlResolvedTool>, String> {
4708 llm.tools
4709 .iter()
4710 .map(|tool| match (llm.tools_format, tool) {
4711 (YamlToolFormat::Openai, YamlToolDeclaration::OpenAi(openai)) => {
4712 let definition = ToolDefinition {
4713 tool_type: openai.tool_type.unwrap_or(ToolType::Function),
4714 function: ToolFunction {
4715 name: openai.function.name.clone(),
4716 description: openai.function.description.clone(),
4717 parameters: openai.function.parameters.clone(),
4718 },
4719 };
4720 Ok(YamlResolvedTool {
4721 definition,
4722 output_schema: openai.function.output_schema.clone(),
4723 })
4724 }
4725 (YamlToolFormat::Simplified, YamlToolDeclaration::Simplified(simple)) => {
4726 let definition = ToolDefinition {
4727 tool_type: ToolType::Function,
4728 function: ToolFunction {
4729 name: simple.name.clone(),
4730 description: simple.description.clone(),
4731 parameters: Some(simple.input_schema.clone()),
4732 },
4733 };
4734 Ok(YamlResolvedTool {
4735 definition,
4736 output_schema: simple.output_schema.clone(),
4737 })
4738 }
4739 (YamlToolFormat::Openai, _) => {
4740 Err("tools_format=openai requires OpenAI-style tool declarations".to_string())
4741 }
4742 (YamlToolFormat::Simplified, _) => {
4743 Err("tools_format=simplified requires simplified tool declarations".to_string())
4744 }
4745 })
4746 .collect()
4747}
4748
4749fn default_llm_output_schema() -> Value {
4750 json!({
4751 "type": "object",
4752 "additionalProperties": true
4753 })
4754}
4755
4756fn mock_rag(topic: &str) -> Value {
4757 let (kb_source, playbook) = match topic {
4758 "probation" => (
4759 "hr_policy/probation.md",
4760 "Collect manager review, performance evidence, and probation timeline.",
4761 ),
4762 "leave_request" => (
4763 "hr_policy/leave.md",
4764 "Validate leave balance, manager approval, and blackout dates.",
4765 ),
4766 "supply_chain_order_assessment" => (
4767 "supply_chain/order_assessment.md",
4768 "Review order specs, inventory risk, and vendor lead-time guidance.",
4769 ),
4770 "supply_chain_order_replacement" => (
4771 "supply_chain/order_replacement.md",
4772 "Collect order id, damage proof, and replacement SLA policy.",
4773 ),
4774 "termination_first_time_offense" => (
4775 "hr_policy/termination_first_offense.md",
4776 "Validate first-incident criteria and route to HRBP review.",
4777 ),
4778 "termination_repeated_offense" => (
4779 "hr_policy/termination_repeated_offense.md",
4780 "Collect prior warnings and escalation approvals before final action.",
4781 ),
4782 _ => (
4783 "shared/request_clarification.md",
4784 "Request clarifying details before routing.",
4785 ),
4786 };
4787
4788 json!({
4789 "kb_source": kb_source,
4790 "playbook": playbook,
4791 })
4792}
4793
4794async fn execute_subworkflow_tool_call(
4795 payload: &Value,
4796 context: &Value,
4797 client: &SimpleAgentsClient,
4798 custom_worker: Option<&dyn YamlWorkflowCustomWorkerExecutor>,
4799 parent_options: &YamlWorkflowRunOptions,
4800 parent_trace_context: Option<&TraceContext>,
4801 resolved_trace_id: Option<&str>,
4802) -> Result<Value, String> {
4803 let workflow_id = payload
4804 .get("workflow_id")
4805 .and_then(Value::as_str)
4806 .ok_or_else(|| "run_workflow_graph requires payload.workflow_id".to_string())?;
4807
4808 let input_context = context
4809 .get("input")
4810 .and_then(Value::as_object)
4811 .ok_or_else(|| "run_workflow_graph requires context.input".to_string())?;
4812
4813 let registry = input_context
4814 .get("workflow_registry")
4815 .and_then(Value::as_object)
4816 .ok_or_else(|| {
4817 "run_workflow_graph requires input.workflow_registry map of workflow_id -> yaml_path"
4818 .to_string()
4819 })?;
4820
4821 let workflow_path = registry
4822 .get(workflow_id)
4823 .and_then(Value::as_str)
4824 .ok_or_else(|| {
4825 format!(
4826 "workflow_registry has no entry for workflow_id '{}'",
4827 workflow_id
4828 )
4829 })?;
4830
4831 let parent_depth = input_context
4832 .get("__subgraph_depth")
4833 .and_then(Value::as_u64)
4834 .unwrap_or(0);
4835 let max_depth = input_context
4836 .get("__subgraph_max_depth")
4837 .and_then(Value::as_u64)
4838 .unwrap_or(3);
4839
4840 if parent_depth >= max_depth {
4841 return Err(format!(
4842 "run_workflow_graph depth limit reached (depth={}, max={})",
4843 parent_depth, max_depth
4844 ));
4845 }
4846
4847 let mut subworkflow_input = payload
4848 .get("input")
4849 .and_then(Value::as_object)
4850 .cloned()
4851 .unwrap_or_default();
4852
4853 if !subworkflow_input.contains_key("messages") {
4854 if let Some(messages) = input_context.get("messages") {
4855 subworkflow_input.insert("messages".to_string(), messages.clone());
4856 }
4857 }
4858
4859 if !subworkflow_input.contains_key("email_text") {
4860 if let Some(email_text) = input_context.get("email_text") {
4861 subworkflow_input.insert("email_text".to_string(), email_text.clone());
4862 }
4863 }
4864
4865 if !subworkflow_input.contains_key("workflow_registry") {
4866 subworkflow_input.insert(
4867 "workflow_registry".to_string(),
4868 Value::Object(registry.clone()),
4869 );
4870 }
4871
4872 subworkflow_input.insert(
4873 "__subgraph_depth".to_string(),
4874 Value::Number(serde_json::Number::from(parent_depth + 1)),
4875 );
4876 subworkflow_input.insert(
4877 "__subgraph_max_depth".to_string(),
4878 Value::Number(serde_json::Number::from(max_depth)),
4879 );
4880
4881 let subworkflow_options =
4882 build_subworkflow_options(parent_options, parent_trace_context, resolved_trace_id);
4883
4884 let output = run_workflow_yaml_file_with_client_and_custom_worker_and_events_and_options(
4885 Path::new(workflow_path),
4886 &Value::Object(subworkflow_input),
4887 client,
4888 custom_worker,
4889 None,
4890 &subworkflow_options,
4891 )
4892 .await
4893 .map_err(|error| format!("subworkflow '{}' failed: {}", workflow_id, error))?;
4894
4895 Ok(json!({
4896 "workflow_id": workflow_id,
4897 "workflow_path": workflow_path,
4898 "terminal_node": output.terminal_node,
4899 "terminal_output": output.terminal_output,
4900 "trace": output.trace,
4901 }))
4902}
4903
4904fn build_subworkflow_options(
4905 parent_options: &YamlWorkflowRunOptions,
4906 parent_trace_context: Option<&TraceContext>,
4907 resolved_trace_id: Option<&str>,
4908) -> YamlWorkflowRunOptions {
4909 let mut subworkflow_options = parent_options.clone();
4910 if parent_trace_context.is_some() || resolved_trace_id.is_some() {
4911 let trace_context = YamlWorkflowTraceContextInput {
4912 trace_id: resolved_trace_id
4913 .map(|value| value.to_string())
4914 .or_else(|| parent_trace_context.and_then(|ctx| ctx.trace_id.clone())),
4915 span_id: parent_trace_context.and_then(|ctx| ctx.span_id.clone()),
4916 parent_span_id: parent_trace_context.and_then(|ctx| ctx.parent_span_id.clone()),
4917 traceparent: parent_trace_context.and_then(|ctx| ctx.traceparent.clone()),
4918 tracestate: parent_trace_context.and_then(|ctx| ctx.tracestate.clone()),
4919 baggage: parent_trace_context
4920 .map(|ctx| ctx.baggage.clone())
4921 .unwrap_or_default(),
4922 };
4923 subworkflow_options.trace.context = Some(trace_context);
4924 }
4925 subworkflow_options
4926}
4927
4928fn mock_custom_worker_output(
4929 handler: &str,
4930 payload: &Value,
4931) -> Result<Value, YamlWorkflowRunError> {
4932 if handler == "get_employee_record" {
4933 let employee_name = payload
4934 .get("employee_name")
4935 .and_then(Value::as_str)
4936 .unwrap_or("Unknown Employee")
4937 .trim();
4938 let normalized_name = if employee_name.is_empty() {
4939 "Unknown Employee"
4940 } else {
4941 employee_name
4942 };
4943
4944 let (employee_id, location) = match normalized_name.to_ascii_lowercase().as_str() {
4945 "alex johnson" => ("EMP-2041", "Austin"),
4946 "priya sharma" => ("EMP-3378", "Bengaluru"),
4947 "marcus lee" => ("EMP-1196", "Singapore"),
4948 "sarah chen" => ("EMP-4450", "Toronto"),
4949 _ => ("EMP-0000", "Unassigned"),
4950 };
4951
4952 return Ok(json!({
4953 "employee_name": normalized_name,
4954 "employee_id": employee_id,
4955 "location": location,
4956 }));
4957 }
4958
4959 if let Some(topic) = payload.get("topic").and_then(Value::as_str) {
4960 let mut value = mock_rag(topic);
4961 if let Value::Object(object) = &mut value {
4962 object.insert("handler".to_string(), Value::String(handler.to_string()));
4963 }
4964 return Ok(value);
4965 }
4966
4967 Err(YamlWorkflowRunError::UnsupportedCustomHandler {
4968 handler: handler.to_string(),
4969 })
4970}
4971
4972#[derive(Debug, Clone, Deserialize)]
4973pub struct YamlWorkflow {
4974 pub id: String,
4975 pub entry_node: String,
4976 #[serde(default)]
4977 pub nodes: Vec<YamlNode>,
4978 #[serde(default)]
4979 pub edges: Vec<YamlEdge>,
4980}
4981
4982#[derive(Debug, Clone, Deserialize)]
4983pub struct YamlNode {
4984 pub id: String,
4985 pub node_type: YamlNodeType,
4986 pub config: Option<YamlNodeConfig>,
4987}
4988
4989impl YamlNode {
4990 fn kind_name(&self) -> &'static str {
4991 if self.node_type.llm_call.is_some() {
4992 "llm_call"
4993 } else if self.node_type.switch.is_some() {
4994 "switch"
4995 } else if self.node_type.custom_worker.is_some() {
4996 "custom_worker"
4997 } else {
4998 "unknown"
4999 }
5000 }
5001}
5002
5003#[derive(Debug, Clone, Deserialize)]
5004pub struct YamlNodeType {
5005 pub llm_call: Option<YamlLlmCall>,
5006 pub switch: Option<YamlSwitch>,
5007 pub custom_worker: Option<YamlCustomWorker>,
5008}
5009
5010#[derive(Debug, Clone, Deserialize)]
5011pub struct YamlLlmCall {
5012 pub model: String,
5013 pub stream: Option<bool>,
5014 pub stream_json_as_text: Option<bool>,
5015 pub heal: Option<bool>,
5016 pub messages_path: Option<String>,
5017 pub append_prompt_as_user: Option<bool>,
5018 #[serde(default)]
5019 pub tools_format: YamlToolFormat,
5020 #[serde(default)]
5021 pub tools: Vec<YamlToolDeclaration>,
5022 pub tool_choice: Option<YamlToolChoiceConfig>,
5023 pub max_tool_roundtrips: Option<u8>,
5024 pub tool_calls_global_key: Option<String>,
5025}
5026
5027#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, Default)]
5028#[serde(rename_all = "snake_case")]
5029pub enum YamlToolFormat {
5030 #[default]
5031 Openai,
5032 Simplified,
5033}
5034
5035#[derive(Debug, Clone, Deserialize)]
5036#[serde(untagged)]
5037pub enum YamlToolDeclaration {
5038 OpenAi(YamlOpenAiToolDeclaration),
5039 Simplified(YamlSimplifiedToolDeclaration),
5040}
5041
5042#[derive(Debug, Clone, Deserialize)]
5043pub struct YamlOpenAiToolDeclaration {
5044 #[serde(rename = "type")]
5045 pub tool_type: Option<ToolType>,
5046 pub function: YamlOpenAiToolFunction,
5047}
5048
5049#[derive(Debug, Clone, Deserialize)]
5050pub struct YamlOpenAiToolFunction {
5051 pub name: String,
5052 pub description: Option<String>,
5053 pub parameters: Option<Value>,
5054 pub output_schema: Option<Value>,
5055}
5056
5057#[derive(Debug, Clone, Deserialize)]
5058pub struct YamlSimplifiedToolDeclaration {
5059 pub name: String,
5060 pub description: Option<String>,
5061 pub input_schema: Value,
5062 pub output_schema: Option<Value>,
5063}
5064
5065#[derive(Debug, Clone, Deserialize)]
5066#[serde(untagged)]
5067pub enum YamlToolChoiceConfig {
5068 Mode(ToolChoiceMode),
5069 Function { function: String },
5070 OpenAi(ToolChoiceTool),
5071}
5072
5073#[derive(Debug, Clone, Deserialize)]
5074pub struct YamlSwitch {
5075 #[serde(default)]
5076 pub branches: Vec<YamlSwitchBranch>,
5077 pub default: String,
5078}
5079
5080#[derive(Debug, Clone, Deserialize)]
5081pub struct YamlSwitchBranch {
5082 pub condition: String,
5083 pub target: String,
5084}
5085
5086#[derive(Debug, Clone, Deserialize)]
5087pub struct YamlCustomWorker {
5088 pub handler: String,
5089}
5090
5091#[derive(Debug, Clone, Deserialize)]
5092pub struct YamlNodeConfig {
5093 pub prompt: Option<String>,
5094 #[serde(default, alias = "schema")]
5095 pub output_schema: Option<Value>,
5096 pub payload: Option<Value>,
5097 pub set_globals: Option<HashMap<String, String>>,
5098 pub update_globals: Option<HashMap<String, YamlGlobalUpdate>>,
5099}
5100
5101#[derive(Debug, Clone, Deserialize)]
5102pub struct YamlGlobalUpdate {
5103 pub op: String,
5104 pub from: Option<String>,
5105 pub by: Option<f64>,
5106}
5107
5108#[derive(Debug, Clone, Deserialize)]
5109pub struct YamlEdge {
5110 pub from: String,
5111 pub to: String,
5112}
5113
5114#[cfg(test)]
5115mod tests {
5116 use super::*;
5117 use simple_agent_type::provider::{Provider, ProviderRequest, ProviderResponse};
5118 use simple_agent_type::response::{CompletionChoice, CompletionResponse, Usage};
5119 use simple_agent_type::tool::{ToolCallFunction, ToolType};
5120 use simple_agent_type::{Result as SaResult, SimpleAgentsError};
5121 use simple_agents_core::SimpleAgentsClientBuilder;
5122 use std::fs;
5123 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5124 use std::sync::{Arc, Mutex, OnceLock};
5125
5126 fn stream_debug_env_lock() -> &'static Mutex<()> {
5127 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
5128 LOCK.get_or_init(|| Mutex::new(()))
5129 }
5130
5131 struct MockExecutor;
5132
5133 struct RecordingSink {
5134 events: Mutex<Vec<YamlWorkflowEvent>>,
5135 }
5136
5137 struct CancelAfterFirstEventSink {
5138 cancelled: AtomicBool,
5139 }
5140
5141 impl YamlWorkflowEventSink for CancelAfterFirstEventSink {
5142 fn emit(&self, _event: &YamlWorkflowEvent) {
5143 self.cancelled.store(true, Ordering::SeqCst);
5144 }
5145
5146 fn is_cancelled(&self) -> bool {
5147 self.cancelled.load(Ordering::SeqCst)
5148 }
5149 }
5150
5151 struct CountingExecutor {
5152 call_count: AtomicUsize,
5153 }
5154
5155 struct CapturingWorker {
5156 context: Mutex<Option<Value>>,
5157 }
5158
5159 struct ToolLoopProvider;
5160
5161 struct UnknownToolProvider;
5162
5163 struct ReasoningUsageProvider;
5164
5165 struct ToolLoopReasoningProvider;
5166
5167 #[async_trait]
5168 impl Provider for ToolLoopProvider {
5169 fn name(&self) -> &str {
5170 "openai"
5171 }
5172
5173 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5174 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5175 Ok(ProviderRequest::new("mock://tool-loop").with_body(body))
5176 }
5177
5178 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5179 let request: CompletionRequest =
5180 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5181
5182 let has_tools = request
5183 .tools
5184 .as_ref()
5185 .is_some_and(|tools| !tools.is_empty());
5186 let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
5187
5188 let response = if has_tools && !has_tool_result {
5189 CompletionResponse {
5190 id: "resp_tool_1".to_string(),
5191 model: request.model.clone(),
5192 choices: vec![CompletionChoice {
5193 index: 0,
5194 message: Message::assistant("").with_tool_calls(vec![ToolCall {
5195 id: "call_get_context".to_string(),
5196 tool_type: ToolType::Function,
5197 function: ToolCallFunction {
5198 name: "get_customer_context".to_string(),
5199 arguments: "{\"order_id\":\"123\"}".to_string(),
5200 },
5201 }]),
5202 finish_reason: FinishReason::ToolCalls,
5203 logprobs: None,
5204 }],
5205 usage: Usage::new(10, 5),
5206 created: None,
5207 provider: Some(self.name().to_string()),
5208 healing_metadata: None,
5209 }
5210 } else if has_tools && has_tool_result {
5211 CompletionResponse {
5212 id: "resp_tool_2".to_string(),
5213 model: request.model.clone(),
5214 choices: vec![CompletionChoice {
5215 index: 0,
5216 message: Message::assistant("{\"state\":\"done\"}"),
5217 finish_reason: FinishReason::Stop,
5218 logprobs: None,
5219 }],
5220 usage: Usage::new(12, 6),
5221 created: None,
5222 provider: Some(self.name().to_string()),
5223 healing_metadata: None,
5224 }
5225 } else {
5226 let prompt = request
5227 .messages
5228 .iter()
5229 .rev()
5230 .find(|m| m.role == Role::User)
5231 .map(|m| m.content.clone())
5232 .unwrap_or_default();
5233 let payload = json!({
5234 "subject": "ok",
5235 "body": prompt,
5236 })
5237 .to_string();
5238 CompletionResponse {
5239 id: "resp_final".to_string(),
5240 model: request.model.clone(),
5241 choices: vec![CompletionChoice {
5242 index: 0,
5243 message: Message::assistant(payload),
5244 finish_reason: FinishReason::Stop,
5245 logprobs: None,
5246 }],
5247 usage: Usage::new(8, 4),
5248 created: None,
5249 provider: Some(self.name().to_string()),
5250 healing_metadata: None,
5251 }
5252 };
5253
5254 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5255 Ok(ProviderResponse::new(200, body))
5256 }
5257
5258 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5259 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5260 }
5261 }
5262
5263 #[async_trait]
5264 impl Provider for UnknownToolProvider {
5265 fn name(&self) -> &str {
5266 "openai"
5267 }
5268
5269 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5270 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5271 Ok(ProviderRequest::new("mock://unknown-tool").with_body(body))
5272 }
5273
5274 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5275 let request: CompletionRequest =
5276 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5277
5278 let response = CompletionResponse {
5279 id: "resp_unknown_tool".to_string(),
5280 model: request.model,
5281 choices: vec![CompletionChoice {
5282 index: 0,
5283 message: Message::assistant("").with_tool_calls(vec![ToolCall {
5284 id: "call_unknown".to_string(),
5285 tool_type: ToolType::Function,
5286 function: ToolCallFunction {
5287 name: "unknown_tool".to_string(),
5288 arguments: "{}".to_string(),
5289 },
5290 }]),
5291 finish_reason: FinishReason::ToolCalls,
5292 logprobs: None,
5293 }],
5294 usage: Usage::new(5, 2),
5295 created: None,
5296 provider: Some(self.name().to_string()),
5297 healing_metadata: None,
5298 };
5299
5300 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5301 Ok(ProviderResponse::new(200, body))
5302 }
5303
5304 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5305 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5306 }
5307 }
5308
5309 #[async_trait]
5310 impl Provider for ReasoningUsageProvider {
5311 fn name(&self) -> &str {
5312 "openai"
5313 }
5314
5315 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5316 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5317 Ok(ProviderRequest::new("mock://reasoning-usage").with_body(body))
5318 }
5319
5320 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5321 let request: CompletionRequest =
5322 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5323
5324 let mut usage = Usage::new(9, 5);
5325 usage.reasoning_tokens = Some(4);
5326 let response = CompletionResponse {
5327 id: "resp_reasoning".to_string(),
5328 model: request.model,
5329 choices: vec![CompletionChoice {
5330 index: 0,
5331 message: Message::assistant("{\"state\":\"ok\"}"),
5332 finish_reason: FinishReason::Stop,
5333 logprobs: None,
5334 }],
5335 usage,
5336 created: None,
5337 provider: Some(self.name().to_string()),
5338 healing_metadata: None,
5339 };
5340
5341 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5342 Ok(ProviderResponse::new(200, body))
5343 }
5344
5345 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5346 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5347 }
5348 }
5349
5350 #[async_trait]
5351 impl Provider for ToolLoopReasoningProvider {
5352 fn name(&self) -> &str {
5353 "openai"
5354 }
5355
5356 fn transform_request(&self, req: &CompletionRequest) -> SaResult<ProviderRequest> {
5357 let body = serde_json::to_value(req).map_err(SimpleAgentsError::from)?;
5358 Ok(ProviderRequest::new("mock://tool-loop-reasoning").with_body(body))
5359 }
5360
5361 async fn execute(&self, req: ProviderRequest) -> SaResult<ProviderResponse> {
5362 let request: CompletionRequest =
5363 serde_json::from_value(req.body).map_err(SimpleAgentsError::from)?;
5364
5365 let has_tools = request
5366 .tools
5367 .as_ref()
5368 .is_some_and(|tools| !tools.is_empty());
5369 let has_tool_result = request.messages.iter().any(|m| m.role == Role::Tool);
5370
5371 let response = if has_tools && !has_tool_result {
5372 let mut usage = Usage::new(10, 5);
5373 usage.reasoning_tokens = Some(2);
5374 CompletionResponse {
5375 id: "resp_tool_reasoning_1".to_string(),
5376 model: request.model.clone(),
5377 choices: vec![CompletionChoice {
5378 index: 0,
5379 message: Message::assistant("").with_tool_calls(vec![ToolCall {
5380 id: "call_get_context".to_string(),
5381 tool_type: ToolType::Function,
5382 function: ToolCallFunction {
5383 name: "get_customer_context".to_string(),
5384 arguments: "{\"order_id\":\"123\"}".to_string(),
5385 },
5386 }]),
5387 finish_reason: FinishReason::ToolCalls,
5388 logprobs: None,
5389 }],
5390 usage,
5391 created: None,
5392 provider: Some(self.name().to_string()),
5393 healing_metadata: None,
5394 }
5395 } else {
5396 let mut usage = Usage::new(12, 6);
5397 usage.reasoning_tokens = Some(3);
5398 CompletionResponse {
5399 id: "resp_tool_reasoning_2".to_string(),
5400 model: request.model,
5401 choices: vec![CompletionChoice {
5402 index: 0,
5403 message: Message::assistant("{\"state\":\"done\"}"),
5404 finish_reason: FinishReason::Stop,
5405 logprobs: None,
5406 }],
5407 usage,
5408 created: None,
5409 provider: Some(self.name().to_string()),
5410 healing_metadata: None,
5411 }
5412 };
5413
5414 let body = serde_json::to_value(response).map_err(SimpleAgentsError::from)?;
5415 Ok(ProviderResponse::new(200, body))
5416 }
5417
5418 fn transform_response(&self, resp: ProviderResponse) -> SaResult<CompletionResponse> {
5419 serde_json::from_value(resp.body).map_err(SimpleAgentsError::from)
5420 }
5421 }
5422
5423 struct FixedToolWorker {
5424 payload: Value,
5425 }
5426
5427 struct CountingToolWorker {
5428 execute_calls: AtomicUsize,
5429 }
5430
5431 #[async_trait]
5432 impl YamlWorkflowCustomWorkerExecutor for FixedToolWorker {
5433 async fn execute(
5434 &self,
5435 _handler: &str,
5436 _payload: &Value,
5437 _email_text: &str,
5438 _context: &Value,
5439 ) -> Result<Value, String> {
5440 Ok(self.payload.clone())
5441 }
5442 }
5443
5444 #[async_trait]
5445 impl YamlWorkflowCustomWorkerExecutor for CountingToolWorker {
5446 async fn execute(
5447 &self,
5448 _handler: &str,
5449 _payload: &Value,
5450 _email_text: &str,
5451 _context: &Value,
5452 ) -> Result<Value, String> {
5453 self.execute_calls.fetch_add(1, Ordering::SeqCst);
5454 Ok(json!({"ok": true}))
5455 }
5456 }
5457
5458 #[async_trait]
5459 impl YamlWorkflowLlmExecutor for CountingExecutor {
5460 async fn complete_structured(
5461 &self,
5462 _request: YamlLlmExecutionRequest,
5463 _event_sink: Option<&dyn YamlWorkflowEventSink>,
5464 ) -> Result<YamlLlmExecutionResult, String> {
5465 self.call_count.fetch_add(1, Ordering::SeqCst);
5466 Ok(YamlLlmExecutionResult {
5467 payload: json!({"state":"ok"}),
5468 usage: None,
5469 ttft_ms: None,
5470 tool_calls: Vec::new(),
5471 })
5472 }
5473 }
5474
5475 impl YamlWorkflowEventSink for RecordingSink {
5476 fn emit(&self, event: &YamlWorkflowEvent) {
5477 self.events
5478 .lock()
5479 .expect("recording sink lock should not be poisoned")
5480 .push(event.clone());
5481 }
5482 }
5483
5484 #[async_trait]
5485 impl YamlWorkflowCustomWorkerExecutor for CapturingWorker {
5486 async fn execute(
5487 &self,
5488 _handler: &str,
5489 _payload: &Value,
5490 _email_text: &str,
5491 context: &Value,
5492 ) -> Result<Value, String> {
5493 let mut guard = self
5494 .context
5495 .lock()
5496 .map_err(|_| "capturing worker lock should not be poisoned".to_string())?;
5497 *guard = Some(context.clone());
5498 Ok(json!({"ok": true}))
5499 }
5500 }
5501
5502 #[async_trait]
5503 impl YamlWorkflowLlmExecutor for MockExecutor {
5504 async fn complete_structured(
5505 &self,
5506 request: YamlLlmExecutionRequest,
5507 _event_sink: Option<&dyn YamlWorkflowEventSink>,
5508 ) -> Result<YamlLlmExecutionResult, String> {
5509 let prompt = request.prompt;
5510 if prompt.contains("exactly one category") {
5511 return Ok(YamlLlmExecutionResult {
5512 payload: json!({"category":"termination","reason":"mock"}),
5513 usage: Some(YamlLlmTokenUsage {
5514 prompt_tokens: 10,
5515 completion_tokens: 5,
5516 total_tokens: 15,
5517 reasoning_tokens: None,
5518 }),
5519 ttft_ms: None,
5520 tool_calls: Vec::new(),
5521 });
5522 }
5523 if prompt.contains("Determine termination subtype") {
5524 return Ok(YamlLlmExecutionResult {
5525 payload: json!({"subtype":"repeated_offense","reason":"mock"}),
5526 usage: Some(YamlLlmTokenUsage {
5527 prompt_tokens: 12,
5528 completion_tokens: 6,
5529 total_tokens: 18,
5530 reasoning_tokens: None,
5531 }),
5532 ttft_ms: None,
5533 tool_calls: Vec::new(),
5534 });
5535 }
5536 if prompt.contains("Determine supply chain subtype") {
5537 return Ok(YamlLlmExecutionResult {
5538 payload: json!({"subtype":"order_replacement","reason":"mock"}),
5539 usage: Some(YamlLlmTokenUsage {
5540 prompt_tokens: 11,
5541 completion_tokens: 4,
5542 total_tokens: 15,
5543 reasoning_tokens: None,
5544 }),
5545 ttft_ms: None,
5546 tool_calls: Vec::new(),
5547 });
5548 }
5549 Err("unexpected prompt".to_string())
5550 }
5551 }
5552
5553 #[tokio::test]
5554 async fn runs_yaml_workflow_and_returns_step_timings() {
5555 let yaml = r#"
5556id: email-intake-classification
5557entry_node: classify_top_level
5558nodes:
5559 - id: classify_top_level
5560 node_type:
5561 llm_call:
5562 model: gpt-4.1
5563 config:
5564 prompt: |
5565 Classify this email into exactly one category:
5566 {{ input.email_text }}
5567 - id: route_top_level
5568 node_type:
5569 switch:
5570 branches:
5571 - condition: '$.nodes.classify_top_level.output.category == "termination"'
5572 target: classify_termination_subtype
5573 default: rag_clarification
5574 - id: classify_termination_subtype
5575 node_type:
5576 llm_call:
5577 model: gpt-4.1
5578 config:
5579 prompt: |
5580 Determine termination subtype:
5581 {{ input.email_text }}
5582 - id: route_termination_subtype
5583 node_type:
5584 switch:
5585 branches:
5586 - condition: '$.nodes.classify_termination_subtype.output.subtype == "repeated_offense"'
5587 target: rag_termination_repeated_offense
5588 default: rag_clarification
5589 - id: rag_termination_repeated_offense
5590 node_type:
5591 custom_worker:
5592 handler: GetRagData
5593 config:
5594 payload:
5595 topic: termination_repeated_offense
5596 - id: rag_clarification
5597 node_type:
5598 custom_worker:
5599 handler: GetRagData
5600 config:
5601 payload:
5602 topic: clarification
5603edges:
5604 - from: classify_top_level
5605 to: route_top_level
5606 - from: classify_termination_subtype
5607 to: route_termination_subtype
5608"#;
5609
5610 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5611 let output = run_email_workflow_yaml(&workflow, "test", &MockExecutor)
5612 .await
5613 .expect("yaml workflow should execute");
5614
5615 assert_eq!(output.workflow_id, "email-intake-classification");
5616 assert_eq!(output.terminal_node, "rag_termination_repeated_offense");
5617 assert!(!output.step_timings.is_empty());
5618 assert_eq!(output.step_timings.len(), output.trace.len());
5619 assert!(output
5620 .outputs
5621 .contains_key("rag_termination_repeated_offense"));
5622 assert_eq!(output.total_input_tokens, 22);
5623 assert_eq!(output.total_output_tokens, 11);
5624 assert_eq!(output.total_tokens, 33);
5625 }
5626
5627 #[tokio::test]
5628 async fn emits_resolved_llm_input_event_with_bindings() {
5629 let yaml = r#"
5630id: email-intake-classification
5631entry_node: classify_top_level
5632nodes:
5633 - id: classify_top_level
5634 node_type:
5635 llm_call:
5636 model: gpt-4.1
5637 config:
5638 prompt: |
5639 Classify this email into exactly one category:
5640 {{ input.email_text }}
5641"#;
5642
5643 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5644 let sink = RecordingSink {
5645 events: Mutex::new(Vec::new()),
5646 };
5647
5648 let output = run_email_workflow_yaml_with_custom_worker_and_events(
5649 &workflow,
5650 "Need help with termination",
5651 &MockExecutor,
5652 None,
5653 Some(&sink),
5654 )
5655 .await
5656 .expect("yaml workflow should execute");
5657
5658 assert_eq!(output.terminal_node, "classify_top_level");
5659
5660 let events = sink
5661 .events
5662 .lock()
5663 .expect("recording sink lock should not be poisoned");
5664 let llm_event = events
5665 .iter()
5666 .find(|event| event.event_type == "node_llm_input_resolved")
5667 .expect("expected llm input telemetry event");
5668
5669 let metadata = llm_event
5670 .metadata
5671 .as_ref()
5672 .expect("llm input event must include metadata");
5673 assert_eq!(metadata["model"], Value::String("gpt-4.1".to_string()));
5674 assert_eq!(metadata["stream_requested"], Value::Bool(false));
5675 assert_eq!(metadata["heal_requested"], Value::Bool(false));
5676 assert!(metadata["prompt"]
5677 .as_str()
5678 .expect("prompt should be a string")
5679 .contains("Need help with termination"));
5680
5681 let bindings = metadata["bindings"]
5682 .as_array()
5683 .expect("bindings should be an array");
5684 assert_eq!(bindings.len(), 1);
5685 assert_eq!(
5686 bindings[0]["source_path"],
5687 Value::String("input.email_text".to_string())
5688 );
5689 assert_eq!(
5690 bindings[0]["resolved"],
5691 Value::String("Need help with termination".to_string())
5692 );
5693 assert_eq!(bindings[0]["missing"], Value::Bool(false));
5694 assert_eq!(
5695 bindings[0]["resolved_type"],
5696 Value::String("string".to_string())
5697 );
5698 }
5699
5700 #[tokio::test]
5701 async fn workflow_completed_event_includes_nerdstats_by_default() {
5702 let yaml = r#"
5703id: nerdstats-default
5704entry_node: classify
5705nodes:
5706 - id: classify
5707 node_type:
5708 llm_call:
5709 model: gpt-4.1
5710 config:
5711 prompt: |
5712 Classify this email into exactly one category:
5713 {{ input.email_text }}
5714"#;
5715
5716 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5717 let sink = RecordingSink {
5718 events: Mutex::new(Vec::new()),
5719 };
5720
5721 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
5722 &workflow,
5723 &json!({"email_text":"hello"}),
5724 &MockExecutor,
5725 None,
5726 Some(&sink),
5727 &YamlWorkflowRunOptions::default(),
5728 )
5729 .await
5730 .expect("workflow should execute");
5731
5732 let events = sink
5733 .events
5734 .lock()
5735 .expect("recording sink lock should not be poisoned");
5736 let completed = events
5737 .iter()
5738 .find(|event| event.event_type == "workflow_completed")
5739 .expect("expected workflow_completed event");
5740 let metadata = completed
5741 .metadata
5742 .as_ref()
5743 .expect("workflow_completed should include metadata by default");
5744 let nerdstats = metadata
5745 .get("nerdstats")
5746 .expect("nerdstats should be present by default");
5747
5748 assert_eq!(nerdstats["workflow_id"], Value::String(output.workflow_id));
5749 assert_eq!(
5750 nerdstats["terminal_node"],
5751 Value::String(output.terminal_node)
5752 );
5753 assert_eq!(
5754 nerdstats["total_tokens"],
5755 Value::Number(output.total_tokens.into())
5756 );
5757 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
5758 assert_eq!(
5759 nerdstats["token_metrics_source"],
5760 Value::String("provider_usage".to_string())
5761 );
5762 assert!(nerdstats.get("step_timings").is_none());
5763 assert!(nerdstats.get("llm_node_metrics").is_none());
5764 assert!(nerdstats.get("step_details").is_some());
5765 assert!(nerdstats.get("llm_node_models").is_none());
5766 assert_eq!(
5767 nerdstats["step_details"][0]["model_name"],
5768 Value::String("gpt-4.1".to_string())
5769 );
5770 assert_eq!(
5771 nerdstats["step_details"][0]["node_id"],
5772 Value::String("classify".to_string())
5773 );
5774 assert_eq!(nerdstats["ttft_ms"], Value::Null);
5775 }
5776
5777 #[tokio::test]
5778 async fn workflow_completed_event_omits_nerdstats_when_disabled() {
5779 let yaml = r#"
5780id: nerdstats-disabled
5781entry_node: classify
5782nodes:
5783 - id: classify
5784 node_type:
5785 llm_call:
5786 model: gpt-4.1
5787 config:
5788 prompt: |
5789 Classify this email into exactly one category:
5790 {{ input.email_text }}
5791"#;
5792
5793 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5794 let sink = RecordingSink {
5795 events: Mutex::new(Vec::new()),
5796 };
5797 let options = YamlWorkflowRunOptions {
5798 telemetry: YamlWorkflowTelemetryConfig {
5799 nerdstats: false,
5800 ..YamlWorkflowTelemetryConfig::default()
5801 },
5802 ..YamlWorkflowRunOptions::default()
5803 };
5804
5805 run_workflow_yaml_with_custom_worker_and_events_and_options(
5806 &workflow,
5807 &json!({"email_text":"hello"}),
5808 &MockExecutor,
5809 None,
5810 Some(&sink),
5811 &options,
5812 )
5813 .await
5814 .expect("workflow should execute");
5815
5816 let events = sink
5817 .events
5818 .lock()
5819 .expect("recording sink lock should not be poisoned");
5820 let completed = events
5821 .iter()
5822 .find(|event| event.event_type == "workflow_completed")
5823 .expect("expected workflow_completed event");
5824 assert!(completed.metadata.is_none());
5825 }
5826
5827 struct StreamAwareMockExecutor;
5828
5829 #[async_trait]
5830 impl YamlWorkflowLlmExecutor for StreamAwareMockExecutor {
5831 async fn complete_structured(
5832 &self,
5833 request: YamlLlmExecutionRequest,
5834 _event_sink: Option<&dyn YamlWorkflowEventSink>,
5835 ) -> Result<YamlLlmExecutionResult, String> {
5836 Ok(YamlLlmExecutionResult {
5837 payload: json!({"state":"ok"}),
5838 usage: Some(YamlLlmTokenUsage {
5839 prompt_tokens: 20,
5840 completion_tokens: 10,
5841 total_tokens: 30,
5842 reasoning_tokens: None,
5843 }),
5844 ttft_ms: if request.stream { Some(12) } else { None },
5845 tool_calls: Vec::new(),
5846 })
5847 }
5848 }
5849
5850 #[tokio::test]
5851 async fn workflow_completed_event_includes_nerdstats_for_streaming_nodes() {
5852 let yaml = r#"
5853id: nerdstats-streaming
5854entry_node: classify
5855nodes:
5856 - id: classify
5857 node_type:
5858 llm_call:
5859 model: gpt-4.1
5860 stream: true
5861 config:
5862 prompt: |
5863 Return JSON only:
5864 {"state":"ok"}
5865"#;
5866
5867 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
5868 let sink = RecordingSink {
5869 events: Mutex::new(Vec::new()),
5870 };
5871
5872 run_workflow_yaml_with_custom_worker_and_events_and_options(
5873 &workflow,
5874 &json!({"email_text":"hello"}),
5875 &StreamAwareMockExecutor,
5876 None,
5877 Some(&sink),
5878 &YamlWorkflowRunOptions::default(),
5879 )
5880 .await
5881 .expect("workflow should execute");
5882
5883 let events = sink
5884 .events
5885 .lock()
5886 .expect("recording sink lock should not be poisoned");
5887 let completed = events
5888 .iter()
5889 .find(|event| event.event_type == "workflow_completed")
5890 .expect("expected workflow_completed event");
5891 let metadata = completed
5892 .metadata
5893 .as_ref()
5894 .expect("workflow_completed should include metadata by default");
5895 let nerdstats = metadata
5896 .get("nerdstats")
5897 .expect("nerdstats should be present by default");
5898
5899 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(true));
5900 assert_eq!(nerdstats["total_tokens"], Value::Number(30u64.into()));
5901 assert_eq!(nerdstats["ttft_ms"], Value::Number(12u64.into()));
5902 assert!(nerdstats.get("step_timings").is_none());
5903 assert!(nerdstats.get("llm_node_metrics").is_none());
5904 assert_eq!(
5905 nerdstats["step_details"][0]["model_name"],
5906 Value::String("gpt-4.1".to_string())
5907 );
5908 assert_eq!(
5909 nerdstats["step_details"][0]["node_id"],
5910 Value::String("classify".to_string())
5911 );
5912 }
5913
5914 #[test]
5915 fn workflow_nerdstats_marks_stream_token_metrics_unavailable() {
5916 let output = YamlWorkflowRunOutput {
5917 workflow_id: "workflow".to_string(),
5918 entry_node: "start".to_string(),
5919 email_text: "hello".to_string(),
5920 trace: vec!["llm_node".to_string()],
5921 outputs: BTreeMap::new(),
5922 terminal_node: "llm_node".to_string(),
5923 terminal_output: None,
5924 step_timings: vec![YamlStepTiming {
5925 node_id: "llm_node".to_string(),
5926 node_kind: "llm_call".to_string(),
5927 model_name: Some("gpt-4.1".to_string()),
5928 elapsed_ms: 100,
5929 prompt_tokens: None,
5930 completion_tokens: None,
5931 total_tokens: None,
5932 reasoning_tokens: None,
5933 tokens_per_second: None,
5934 }],
5935 llm_node_metrics: BTreeMap::new(),
5936 llm_node_models: BTreeMap::new(),
5937 total_elapsed_ms: 100,
5938 ttft_ms: None,
5939 total_input_tokens: 0,
5940 total_output_tokens: 0,
5941 total_tokens: 0,
5942 total_reasoning_tokens: None,
5943 tokens_per_second: 0.0,
5944 trace_id: Some("trace-1".to_string()),
5945 metadata: None,
5946 };
5947
5948 let nerdstats = workflow_nerdstats(&output);
5949 assert_eq!(nerdstats["token_metrics_available"], Value::Bool(false));
5950 assert_eq!(
5951 nerdstats["token_metrics_source"],
5952 Value::String("provider_stream_usage_unavailable".to_string())
5953 );
5954 assert_eq!(nerdstats["total_tokens"], Value::Null);
5955 assert_eq!(nerdstats["ttft_ms"], Value::Null);
5956 assert!(nerdstats.get("step_timings").is_none());
5957 assert!(nerdstats.get("llm_node_metrics").is_none());
5958 assert_eq!(
5959 nerdstats["step_details"][0]["node_id"],
5960 Value::String("llm_node".to_string())
5961 );
5962 assert_eq!(
5963 nerdstats["step_details"][0]["model_name"],
5964 Value::String("gpt-4.1".to_string())
5965 );
5966 }
5967
5968 #[test]
5969 fn workflow_nerdstats_includes_ttft_when_available() {
5970 let output = YamlWorkflowRunOutput {
5971 workflow_id: "workflow".to_string(),
5972 entry_node: "start".to_string(),
5973 email_text: "hello".to_string(),
5974 trace: vec!["llm_node".to_string()],
5975 outputs: BTreeMap::new(),
5976 terminal_node: "llm_node".to_string(),
5977 terminal_output: None,
5978 step_timings: vec![YamlStepTiming {
5979 node_id: "llm_node".to_string(),
5980 node_kind: "llm_call".to_string(),
5981 model_name: Some("gpt-4.1".to_string()),
5982 elapsed_ms: 100,
5983 prompt_tokens: Some(10),
5984 completion_tokens: Some(15),
5985 total_tokens: Some(25),
5986 reasoning_tokens: None,
5987 tokens_per_second: Some(150.0),
5988 }],
5989 llm_node_metrics: BTreeMap::new(),
5990 llm_node_models: BTreeMap::new(),
5991 total_elapsed_ms: 100,
5992 ttft_ms: Some(42),
5993 total_input_tokens: 10,
5994 total_output_tokens: 15,
5995 total_tokens: 25,
5996 total_reasoning_tokens: None,
5997 tokens_per_second: 150.0,
5998 trace_id: Some("trace-2".to_string()),
5999 metadata: None,
6000 };
6001
6002 let nerdstats = workflow_nerdstats(&output);
6003 assert_eq!(nerdstats["ttft_ms"], Value::Number(42u64.into()));
6004 assert!(nerdstats.get("step_timings").is_none());
6005 assert!(nerdstats.get("llm_node_metrics").is_none());
6006 assert_eq!(
6007 nerdstats["step_details"][0]["node_id"],
6008 Value::String("llm_node".to_string())
6009 );
6010 assert_eq!(
6011 nerdstats["step_details"][0]["model_name"],
6012 Value::String("gpt-4.1".to_string())
6013 );
6014 }
6015
6016 #[test]
6017 fn workflow_nerdstats_schema_contract_is_stable() {
6018 let output = YamlWorkflowRunOutput {
6019 workflow_id: "schema-workflow".to_string(),
6020 entry_node: "start".to_string(),
6021 email_text: "hello".to_string(),
6022 trace: vec!["classify".to_string(), "route".to_string()],
6023 outputs: BTreeMap::new(),
6024 terminal_node: "route".to_string(),
6025 terminal_output: None,
6026 step_timings: vec![
6027 YamlStepTiming {
6028 node_id: "classify".to_string(),
6029 node_kind: "llm_call".to_string(),
6030 model_name: Some("gpt-4.1".to_string()),
6031 elapsed_ms: 100,
6032 prompt_tokens: Some(11),
6033 completion_tokens: Some(22),
6034 total_tokens: Some(33),
6035 reasoning_tokens: Some(7),
6036 tokens_per_second: Some(220.0),
6037 },
6038 YamlStepTiming {
6039 node_id: "route".to_string(),
6040 node_kind: "switch".to_string(),
6041 model_name: None,
6042 elapsed_ms: 0,
6043 prompt_tokens: None,
6044 completion_tokens: None,
6045 total_tokens: None,
6046 reasoning_tokens: None,
6047 tokens_per_second: None,
6048 },
6049 ],
6050 llm_node_metrics: BTreeMap::new(),
6051 llm_node_models: BTreeMap::new(),
6052 total_elapsed_ms: 100,
6053 ttft_ms: Some(9),
6054 total_input_tokens: 11,
6055 total_output_tokens: 22,
6056 total_tokens: 33,
6057 total_reasoning_tokens: Some(7),
6058 tokens_per_second: 220.0,
6059 trace_id: Some("trace-schema".to_string()),
6060 metadata: None,
6061 };
6062
6063 let nerdstats = workflow_nerdstats(&output);
6064 let expected = json!({
6065 "workflow_id": "schema-workflow",
6066 "terminal_node": "route",
6067 "total_elapsed_ms": 100,
6068 "ttft_ms": 9,
6069 "step_details": [
6070 {
6071 "node_id": "classify",
6072 "node_kind": "llm_call",
6073 "model_name": "gpt-4.1",
6074 "elapsed_ms": 100,
6075 "prompt_tokens": 11,
6076 "completion_tokens": 22,
6077 "total_tokens": 33,
6078 "reasoning_tokens": 7,
6079 "tokens_per_second": 220.0
6080 },
6081 {
6082 "node_id": "route",
6083 "node_kind": "switch",
6084 "elapsed_ms": 0
6085 }
6086 ],
6087 "total_input_tokens": 11,
6088 "total_output_tokens": 22,
6089 "total_tokens": 33,
6090 "total_reasoning_tokens": 7,
6091 "tokens_per_second": 220.0,
6092 "trace_id": "trace-schema",
6093 "token_metrics_available": true,
6094 "token_metrics_source": "provider_usage",
6095 "llm_nodes_without_usage": []
6096 });
6097
6098 assert_eq!(nerdstats, expected);
6099 }
6100
6101 struct MessageHistoryExecutor;
6102
6103 #[async_trait]
6104 impl YamlWorkflowLlmExecutor for MessageHistoryExecutor {
6105 async fn complete_structured(
6106 &self,
6107 request: YamlLlmExecutionRequest,
6108 _event_sink: Option<&dyn YamlWorkflowEventSink>,
6109 ) -> Result<YamlLlmExecutionResult, String> {
6110 let messages = request
6111 .messages
6112 .ok_or_else(|| "expected messages in request".to_string())?;
6113 if messages.len() != 2 {
6114 return Err(format!("expected 2 messages, got {}", messages.len()));
6115 }
6116 Ok(YamlLlmExecutionResult {
6117 payload: json!({"category":"termination","reason":"history"}),
6118 usage: Some(YamlLlmTokenUsage {
6119 prompt_tokens: 7,
6120 completion_tokens: 3,
6121 total_tokens: 10,
6122 reasoning_tokens: None,
6123 }),
6124 ttft_ms: None,
6125 tool_calls: Vec::new(),
6126 })
6127 }
6128 }
6129
6130 #[tokio::test]
6131 async fn supports_messages_path_in_workflow_input() {
6132 let yaml = r#"
6133id: email-intake-classification
6134entry_node: classify_top_level
6135nodes:
6136 - id: classify_top_level
6137 node_type:
6138 llm_call:
6139 model: gpt-4.1
6140 messages_path: input.messages
6141 append_prompt_as_user: false
6142"#;
6143
6144 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6145 let input = json!({
6146 "email_text": "ignored",
6147 "messages": [
6148 {"role": "system", "content": "You are a classifier"},
6149 {"role": "user", "content": "Please classify this"}
6150 ]
6151 });
6152
6153 let output = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
6154 .await
6155 .expect("workflow should use chat history from input");
6156
6157 assert_eq!(output.terminal_node, "classify_top_level");
6158 assert_eq!(
6159 output.outputs["classify_top_level"]["output"]["reason"],
6160 Value::String("history".to_string())
6161 );
6162 }
6163
6164 #[tokio::test]
6165 async fn wrapper_entrypoints_produce_equivalent_outputs() {
6166 let yaml = r#"
6167id: wrapper-equivalence
6168entry_node: classify
6169nodes:
6170 - id: classify
6171 node_type:
6172 llm_call:
6173 model: gpt-4.1
6174 config:
6175 prompt: |
6176 Classify this email into exactly one category:
6177 {{ input.email_text }}
6178"#;
6179
6180 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6181 let input = json!({"email_text":"hello"});
6182
6183 let a = run_workflow_yaml(&workflow, &input, &MockExecutor)
6184 .await
6185 .expect("base entrypoint should execute");
6186 let b = run_workflow_yaml_with_custom_worker(&workflow, &input, &MockExecutor, None)
6187 .await
6188 .expect("custom worker wrapper should execute");
6189 let c = run_workflow_yaml_with_custom_worker_and_events_and_options(
6190 &workflow,
6191 &input,
6192 &MockExecutor,
6193 None,
6194 None,
6195 &YamlWorkflowRunOptions::default(),
6196 )
6197 .await
6198 .expect("events/options wrapper should execute");
6199
6200 assert_eq!(a.workflow_id, b.workflow_id);
6201 assert_eq!(a.workflow_id, c.workflow_id);
6202 assert_eq!(a.terminal_node, b.terminal_node);
6203 assert_eq!(a.terminal_node, c.terminal_node);
6204 assert_eq!(a.outputs, b.outputs);
6205 assert_eq!(a.outputs, c.outputs);
6206 assert_eq!(a.total_tokens, b.total_tokens);
6207 assert_eq!(a.total_tokens, c.total_tokens);
6208 }
6209
6210 #[tokio::test]
6211 async fn yaml_llm_tool_calling_captures_traces_and_supports_globals_reference() {
6212 let yaml = r#"
6213id: tool-calling-workflow
6214entry_node: generate_with_tool
6215nodes:
6216 - id: generate_with_tool
6217 node_type:
6218 llm_call:
6219 model: gpt-4.1
6220 tools_format: simplified
6221 max_tool_roundtrips: 1
6222 tool_calls_global_key: audit
6223 tools:
6224 - name: get_customer_context
6225 description: Fetch customer context
6226 input_schema:
6227 type: object
6228 properties:
6229 order_id: { type: string }
6230 required: [order_id]
6231 additionalProperties: false
6232 output_schema:
6233 type: object
6234 properties:
6235 customer_name: { type: string }
6236 required: [customer_name]
6237 additionalProperties: false
6238 config:
6239 output_schema:
6240 type: object
6241 properties:
6242 state: { type: string }
6243 required: [state]
6244 - id: personalize
6245 node_type:
6246 llm_call:
6247 model: gpt-4.1
6248 config:
6249 prompt: |
6250 Write an email greeting for {{ globals.audit.0.output.customer_name }}.
6251 output_schema:
6252 type: object
6253 properties:
6254 subject: { type: string }
6255 body: { type: string }
6256 required: [subject, body]
6257edges:
6258 - from: generate_with_tool
6259 to: personalize
6260"#;
6261
6262 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6263 let client = SimpleAgentsClientBuilder::new()
6264 .with_provider(Arc::new(ToolLoopProvider))
6265 .build()
6266 .expect("client should build");
6267 let worker = FixedToolWorker {
6268 payload: json!({"customer_name": "Ava"}),
6269 };
6270
6271 let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6272 &workflow,
6273 &json!({"email_text":"hello"}),
6274 &client,
6275 Some(&worker),
6276 None,
6277 &YamlWorkflowRunOptions::default(),
6278 )
6279 .await
6280 .expect("workflow should execute");
6281
6282 assert_eq!(output.trace, vec!["generate_with_tool", "personalize"]);
6283 assert_eq!(
6284 output.outputs["generate_with_tool"]["tool_calls"][0]["output"]["customer_name"],
6285 Value::String("Ava".to_string())
6286 );
6287 let body = output.outputs["personalize"]["output"]["body"]
6288 .as_str()
6289 .expect("body should be string");
6290 assert!(body.contains("Ava"));
6291 }
6292
6293 #[tokio::test]
6294 async fn workflow_with_client_preserves_reasoning_tokens_in_output_and_nerdstats() {
6295 let yaml = r#"
6296id: reasoning-usage-workflow
6297entry_node: classify
6298nodes:
6299 - id: classify
6300 node_type:
6301 llm_call:
6302 model: gpt-4.1
6303 config:
6304 prompt: |
6305 Return JSON only:
6306 {"state":"ok"}
6307 output_schema:
6308 type: object
6309 properties:
6310 state: { type: string }
6311 required: [state]
6312"#;
6313
6314 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6315 let client = SimpleAgentsClientBuilder::new()
6316 .with_provider(Arc::new(ReasoningUsageProvider))
6317 .build()
6318 .expect("client should build");
6319
6320 let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6321 &workflow,
6322 &json!({"email_text":"hello"}),
6323 &client,
6324 None,
6325 None,
6326 &YamlWorkflowRunOptions::default(),
6327 )
6328 .await
6329 .expect("workflow should execute");
6330
6331 assert_eq!(output.total_reasoning_tokens, Some(4));
6332 assert_eq!(output.step_timings.len(), 1);
6333 assert_eq!(output.step_timings[0].reasoning_tokens, Some(4));
6334
6335 let nerdstats = workflow_nerdstats(&output);
6336 assert_eq!(
6337 nerdstats["total_reasoning_tokens"],
6338 Value::Number(4u64.into())
6339 );
6340 assert_eq!(
6341 nerdstats["step_details"][0]["reasoning_tokens"],
6342 Value::Number(4u64.into())
6343 );
6344 }
6345
6346 #[tokio::test]
6347 async fn workflow_with_tools_accumulates_reasoning_tokens_across_roundtrips() {
6348 let yaml = r#"
6349id: tool-reasoning-workflow
6350entry_node: generate_with_tool
6351nodes:
6352 - id: generate_with_tool
6353 node_type:
6354 llm_call:
6355 model: gpt-4.1
6356 tools_format: simplified
6357 max_tool_roundtrips: 1
6358 tools:
6359 - name: get_customer_context
6360 input_schema:
6361 type: object
6362 properties:
6363 order_id: { type: string }
6364 required: [order_id]
6365 additionalProperties: false
6366 output_schema:
6367 type: object
6368 properties:
6369 customer_name: { type: string }
6370 required: [customer_name]
6371 additionalProperties: false
6372 config:
6373 output_schema:
6374 type: object
6375 properties:
6376 state: { type: string }
6377 required: [state]
6378"#;
6379
6380 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6381 let client = SimpleAgentsClientBuilder::new()
6382 .with_provider(Arc::new(ToolLoopReasoningProvider))
6383 .build()
6384 .expect("client should build");
6385 let worker = FixedToolWorker {
6386 payload: json!({"customer_name": "Ava"}),
6387 };
6388
6389 let output = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6390 &workflow,
6391 &json!({"email_text":"hello"}),
6392 &client,
6393 Some(&worker),
6394 None,
6395 &YamlWorkflowRunOptions::default(),
6396 )
6397 .await
6398 .expect("workflow should execute");
6399
6400 assert_eq!(output.total_reasoning_tokens, Some(5));
6401 assert_eq!(output.step_timings.len(), 1);
6402 assert_eq!(output.step_timings[0].reasoning_tokens, Some(5));
6403
6404 let nerdstats = workflow_nerdstats(&output);
6405 assert_eq!(
6406 nerdstats["total_reasoning_tokens"],
6407 Value::Number(5u64.into())
6408 );
6409 assert_eq!(
6410 nerdstats["step_details"][0]["reasoning_tokens"],
6411 Value::Number(5u64.into())
6412 );
6413 }
6414
6415 #[tokio::test]
6416 async fn yaml_llm_tool_output_schema_mismatch_hard_fails_node() {
6417 let yaml = r#"
6418id: tool-calling-schema-fail
6419entry_node: generate_with_tool
6420nodes:
6421 - id: generate_with_tool
6422 node_type:
6423 llm_call:
6424 model: gpt-4.1
6425 tools_format: simplified
6426 max_tool_roundtrips: 1
6427 tools:
6428 - name: get_customer_context
6429 input_schema:
6430 type: object
6431 properties:
6432 order_id: { type: string }
6433 required: [order_id]
6434 additionalProperties: false
6435 output_schema:
6436 type: object
6437 properties:
6438 customer_name: { type: string }
6439 required: [customer_name]
6440 additionalProperties: false
6441 config:
6442 output_schema:
6443 type: object
6444 properties:
6445 state: { type: string }
6446 required: [state]
6447"#;
6448
6449 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6450 let client = SimpleAgentsClientBuilder::new()
6451 .with_provider(Arc::new(ToolLoopProvider))
6452 .build()
6453 .expect("client should build");
6454 let worker = FixedToolWorker {
6455 payload: json!({"unexpected": "shape"}),
6456 };
6457
6458 let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6459 &workflow,
6460 &json!({"email_text":"hello"}),
6461 &client,
6462 Some(&worker),
6463 None,
6464 &YamlWorkflowRunOptions::default(),
6465 )
6466 .await
6467 .expect_err("workflow should hard-fail on schema mismatch");
6468
6469 match error {
6470 YamlWorkflowRunError::Llm { message, .. } => {
6471 assert!(message.contains("output failed schema validation"));
6472 }
6473 other => panic!("expected llm error, got {other:?}"),
6474 }
6475 }
6476
6477 #[tokio::test]
6478 async fn yaml_llm_unknown_tool_is_rejected_before_custom_worker_execution() {
6479 let yaml = r#"
6480id: unknown-tool-rejected
6481entry_node: generate_with_tool
6482nodes:
6483 - id: generate_with_tool
6484 node_type:
6485 llm_call:
6486 model: gpt-4.1
6487 tools_format: simplified
6488 max_tool_roundtrips: 1
6489 tools:
6490 - name: get_customer_context
6491 input_schema:
6492 type: object
6493 properties:
6494 order_id: { type: string }
6495 required: [order_id]
6496 config:
6497 output_schema:
6498 type: object
6499 properties:
6500 state: { type: string }
6501 required: [state]
6502"#;
6503
6504 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6505 let client = SimpleAgentsClientBuilder::new()
6506 .with_provider(Arc::new(UnknownToolProvider))
6507 .build()
6508 .expect("client should build");
6509 let worker = CountingToolWorker {
6510 execute_calls: AtomicUsize::new(0),
6511 };
6512
6513 let error = run_workflow_yaml_with_client_and_custom_worker_and_events_and_options(
6514 &workflow,
6515 &json!({"email_text":"hello"}),
6516 &client,
6517 Some(&worker),
6518 None,
6519 &YamlWorkflowRunOptions::default(),
6520 )
6521 .await
6522 .expect_err("workflow should reject unknown tool before executing worker");
6523
6524 match error {
6525 YamlWorkflowRunError::Llm { message, .. } => {
6526 assert!(message.contains("model requested unknown tool 'unknown_tool'"));
6527 }
6528 other => panic!("expected llm error, got {other:?}"),
6529 }
6530
6531 assert_eq!(worker.execute_calls.load(Ordering::SeqCst), 0);
6532 }
6533
6534 #[test]
6535 fn validates_tools_format_mismatch() {
6536 let yaml = r#"
6537id: mismatch
6538entry_node: generate
6539nodes:
6540 - id: generate
6541 node_type:
6542 llm_call:
6543 model: gpt-4.1
6544 tools_format: openai
6545 tools:
6546 - name: get_customer_context
6547 input_schema:
6548 type: object
6549 properties:
6550 order_id: { type: string }
6551 required: [order_id]
6552"#;
6553
6554 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6555 let diagnostics = verify_yaml_workflow(&workflow);
6556 assert!(diagnostics
6557 .iter()
6558 .any(|diagnostic| diagnostic.code == "invalid_tools_format"));
6559 }
6560
6561 #[test]
6562 fn mock_custom_worker_supports_get_employee_record() {
6563 let result = mock_custom_worker_output(
6564 "get_employee_record",
6565 &json!({"employee_name": "Alex Johnson"}),
6566 )
6567 .expect("mock tool should resolve employee record");
6568
6569 assert_eq!(result["employee_id"], Value::String("EMP-2041".to_string()));
6570 assert_eq!(result["location"], Value::String("Austin".to_string()));
6571 }
6572
6573 #[tokio::test]
6574 async fn custom_worker_receives_trace_context_block() {
6575 let yaml = r#"
6576id: custom-worker-trace-context
6577entry_node: lookup
6578nodes:
6579 - id: lookup
6580 node_type:
6581 custom_worker:
6582 handler: GetRagData
6583 config:
6584 payload:
6585 topic: demo
6586"#;
6587
6588 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6589 let worker = CapturingWorker {
6590 context: Mutex::new(None),
6591 };
6592 let options = YamlWorkflowRunOptions {
6593 trace: YamlWorkflowTraceOptions {
6594 context: Some(YamlWorkflowTraceContextInput {
6595 trace_id: Some("trace-fixed-ctx".to_string()),
6596 traceparent: Some("00-trace-fixed-ctx-span-fixed-01".to_string()),
6597 ..YamlWorkflowTraceContextInput::default()
6598 }),
6599 tenant: YamlWorkflowTraceTenantContext {
6600 conversation_id: Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5".to_string()),
6601 request_id: Some("turn-7".to_string()),
6602 ..YamlWorkflowTraceTenantContext::default()
6603 },
6604 },
6605 ..YamlWorkflowRunOptions::default()
6606 };
6607
6608 run_workflow_yaml_with_custom_worker_and_events_and_options(
6609 &workflow,
6610 &json!({"email_text":"hello"}),
6611 &MockExecutor,
6612 Some(&worker),
6613 None,
6614 &options,
6615 )
6616 .await
6617 .expect("workflow should execute");
6618
6619 let captured_context = worker
6620 .context
6621 .lock()
6622 .expect("capturing worker lock should not be poisoned")
6623 .clone()
6624 .expect("custom worker should receive context");
6625
6626 assert_eq!(
6627 captured_context
6628 .get("trace")
6629 .and_then(|trace| trace.get("context"))
6630 .and_then(|context| context.get("trace_id"))
6631 .and_then(Value::as_str),
6632 Some("trace-fixed-ctx")
6633 );
6634 assert_eq!(
6635 captured_context
6636 .get("trace")
6637 .and_then(|trace| trace.get("context"))
6638 .and_then(|context| context.get("traceparent"))
6639 .and_then(Value::as_str),
6640 Some("00-trace-fixed-ctx-span-fixed-01")
6641 );
6642 assert_eq!(
6643 captured_context
6644 .get("trace")
6645 .and_then(|trace| trace.get("tenant"))
6646 .and_then(|tenant| tenant.get("conversation_id"))
6647 .and_then(Value::as_str),
6648 Some("7fd67af3-c67d-46cb-95af-08f8ed7b06a5")
6649 );
6650 }
6651
6652 #[tokio::test]
6653 async fn event_sink_cancellation_stops_workflow_before_llm_execution() {
6654 let yaml = r#"
6655id: cancellation-test
6656entry_node: classify
6657nodes:
6658 - id: classify
6659 node_type:
6660 llm_call:
6661 model: gpt-4.1
6662 config:
6663 prompt: |
6664 Classify this email into exactly one category:
6665 {{ input.email_text }}
6666"#;
6667
6668 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6669 let executor = CountingExecutor {
6670 call_count: AtomicUsize::new(0),
6671 };
6672 let sink = CancelAfterFirstEventSink {
6673 cancelled: AtomicBool::new(false),
6674 };
6675
6676 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
6677 &workflow,
6678 &json!({"email_text":"hello"}),
6679 &executor,
6680 None,
6681 Some(&sink),
6682 &YamlWorkflowRunOptions::default(),
6683 )
6684 .await
6685 .expect_err("workflow should stop when sink signals cancellation");
6686
6687 assert!(matches!(
6688 err,
6689 YamlWorkflowRunError::EventSinkCancelled { .. }
6690 ));
6691 assert_eq!(executor.call_count.load(Ordering::SeqCst), 0);
6692 }
6693
6694 #[tokio::test]
6695 async fn rejects_invalid_messages_path_shape() {
6696 let yaml = r#"
6697id: email-intake-classification
6698entry_node: classify_top_level
6699nodes:
6700 - id: classify_top_level
6701 node_type:
6702 llm_call:
6703 model: gpt-4.1
6704 messages_path: input.messages
6705"#;
6706
6707 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6708 let input = json!({
6709 "email_text": "ignored",
6710 "messages": "not-a-list"
6711 });
6712
6713 let err = run_workflow_yaml(&workflow, &input, &MessageHistoryExecutor)
6714 .await
6715 .expect_err("workflow should fail for invalid messages shape");
6716
6717 assert!(matches!(err, YamlWorkflowRunError::Llm { .. }));
6718 }
6719
6720 #[test]
6721 fn renders_yaml_workflow_to_mermaid_with_switch_labels() {
6722 let yaml = r#"
6723id: chat-workflow
6724entry_node: decide
6725nodes:
6726 - id: decide
6727 node_type:
6728 switch:
6729 branches:
6730 - condition: '$.input.mode == "draft"'
6731 target: draft
6732 default: ask
6733 - id: draft
6734 node_type:
6735 llm_call:
6736 model: gpt-4.1
6737 - id: ask
6738 node_type:
6739 llm_call:
6740 model: gpt-4.1
6741edges:
6742 - from: draft
6743 to: ask
6744"#;
6745
6746 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6747 let mermaid = yaml_workflow_to_mermaid(&workflow);
6748
6749 assert!(mermaid.contains("flowchart TD"));
6750 assert!(mermaid.contains("decide -- \"route1\" --> draft"));
6751 assert!(mermaid.contains("decide -- \"default\" --> ask"));
6752 assert!(mermaid.contains("draft --> ask"));
6753 }
6754
6755 #[test]
6756 fn renders_yaml_workflow_tools_as_colored_tool_nodes() {
6757 let yaml = r#"
6758id: tool-graph
6759entry_node: chat
6760nodes:
6761 - id: chat
6762 node_type:
6763 llm_call:
6764 model: gemini-3-flash
6765 tools_format: simplified
6766 tools:
6767 - name: run_workflow_graph
6768 input_schema:
6769 type: object
6770 properties:
6771 workflow_id: { type: string }
6772 required: [workflow_id]
6773edges: []
6774"#;
6775
6776 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6777 let mermaid = yaml_workflow_to_mermaid(&workflow);
6778
6779 assert!(mermaid.contains("chat__tool_0"));
6780 assert!(mermaid.contains("chat -.-> chat__tool_0"));
6781 assert!(mermaid.contains("classDef toolNode"));
6782 assert!(mermaid.contains("class chat__tool_0 toolNode;"));
6783 }
6784
6785 #[test]
6786 fn renders_yaml_workflow_file_to_mermaid_with_subgraph_cluster_when_present() {
6787 let base_dir = std::env::temp_dir().join(format!(
6788 "simple_agents_mermaid_subgraph_{}",
6789 std::time::SystemTime::now()
6790 .duration_since(std::time::UNIX_EPOCH)
6791 .expect("unix epoch")
6792 .as_nanos()
6793 ));
6794 fs::create_dir_all(&base_dir).expect("temp dir should be created");
6795
6796 let orchestrator_path = base_dir.join("email-chat-orchestrator-with-subgraph-tool.yaml");
6797 let subgraph_path = base_dir.join("hr-warning-email-subgraph.yaml");
6798
6799 let orchestrator_yaml = r#"
6800id: email-chat-orchestrator-with-subgraph-tool
6801entry_node: respond_casual
6802nodes:
6803 - id: respond_casual
6804 node_type:
6805 llm_call:
6806 model: gemini-3-flash
6807 tools_format: simplified
6808 tools:
6809 - name: run_workflow_graph
6810 input_schema:
6811 type: object
6812 properties:
6813 workflow_id: { type: string }
6814 required: [workflow_id]
6815 config:
6816 prompt: |
6817 Call with:
6818 {
6819 "workflow_id": "hr_warning_email_subgraph"
6820 }
6821edges: []
6822"#;
6823
6824 let subgraph_yaml = r#"
6825id: hr-warning-email-subgraph
6826entry_node: draft_hr_warning_email
6827nodes:
6828 - id: draft_hr_warning_email
6829 node_type:
6830 llm_call:
6831 model: gemini-3-flash
6832edges: []
6833"#;
6834
6835 fs::write(&orchestrator_path, orchestrator_yaml).expect("orchestrator yaml written");
6836 fs::write(&subgraph_path, subgraph_yaml).expect("subgraph yaml written");
6837
6838 let mermaid =
6839 yaml_workflow_file_to_mermaid(&orchestrator_path).expect("mermaid should render");
6840
6841 assert!(mermaid.contains("Main: email-chat-orchestrator-with-subgraph-tool"));
6842 assert!(mermaid.contains("Subgraph: hr_warning_email_subgraph"));
6843 assert!(mermaid.contains("calls hr_warning_email_subgraph"));
6844 assert!(mermaid.contains("subgraph_1__draft_hr_warning_email"));
6845
6846 fs::remove_dir_all(base_dir).expect("temp dir removed");
6847 }
6848
6849 #[test]
6850 fn converts_yaml_workflow_to_ir_definition() {
6851 let yaml = r#"
6852id: chat-workflow
6853entry_node: classify
6854nodes:
6855 - id: classify
6856 node_type:
6857 llm_call:
6858 model: gpt-4.1
6859 config:
6860 prompt: |
6861 classify
6862 - id: route
6863 node_type:
6864 switch:
6865 branches:
6866 - condition: '$.nodes.classify.output.kind == "x"'
6867 target: done
6868 default: done
6869 - id: done
6870 node_type:
6871 custom_worker:
6872 handler: GetRagData
6873 config:
6874 payload:
6875 topic: test
6876edges:
6877 - from: classify
6878 to: route
6879"#;
6880
6881 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6882 let ir = yaml_workflow_to_ir(&workflow).expect("yaml should convert to ir");
6883
6884 assert_eq!(ir.name, "chat-workflow");
6885 assert!(ir.nodes.iter().any(|n| n.id == "__yaml_start"));
6886 assert!(ir.nodes.iter().any(|n| n.id == "classify"));
6887 assert!(ir.nodes.iter().any(|n| n.id == "route"));
6888 assert!(ir.nodes.iter().any(|n| n.id == "done"));
6889 }
6890
6891 #[test]
6892 fn supports_yaml_to_ir_when_messages_path_is_used() {
6893 let yaml = r#"
6894id: chat-workflow
6895entry_node: classify
6896nodes:
6897 - id: classify
6898 node_type:
6899 llm_call:
6900 model: gpt-4.1
6901 messages_path: input.messages
6902"#;
6903
6904 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6905 let ir =
6906 yaml_workflow_to_ir(&workflow).expect("messages_path should convert to tool-based IR");
6907 assert!(ir.nodes.iter().any(|node| matches!(
6908 node.kind,
6909 crate::ir::NodeKind::Tool { ref tool, .. } if tool == "__yaml_llm_call"
6910 )));
6911 }
6912
6913 #[tokio::test]
6914 async fn workflow_output_contains_trace_id_in_both_locations() {
6915 let yaml = r#"
6916id: trace-test
6917entry_node: classify
6918nodes:
6919 - id: classify
6920 node_type:
6921 llm_call:
6922 model: gpt-4.1
6923 config:
6924 prompt: |
6925 Classify this email into exactly one category:
6926 {{ input.email_text }}
6927"#;
6928
6929 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6930 let output = run_workflow_yaml(&workflow, &json!({"email_text":"hello"}), &MockExecutor)
6931 .await
6932 .expect("workflow should execute");
6933
6934 let trace_id = output
6935 .trace_id
6936 .as_deref()
6937 .expect("trace_id should be present");
6938 assert!(!trace_id.is_empty());
6939 assert_eq!(
6940 output.metadata.as_ref().and_then(|value| {
6941 value
6942 .get("telemetry")
6943 .and_then(|telemetry| telemetry.get("trace_id"))
6944 .and_then(Value::as_str)
6945 }),
6946 Some(trace_id)
6947 );
6948 }
6949
6950 #[tokio::test]
6951 async fn workflow_run_options_sample_rate_zero_marks_trace_unsampled() {
6952 let yaml = r#"
6953id: sample-rate-zero
6954entry_node: classify
6955nodes:
6956 - id: classify
6957 node_type:
6958 llm_call:
6959 model: gpt-4.1
6960 config:
6961 prompt: |
6962 Classify this email into exactly one category:
6963 {{ input.email_text }}
6964"#;
6965
6966 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
6967 let options = YamlWorkflowRunOptions {
6968 telemetry: YamlWorkflowTelemetryConfig {
6969 sample_rate: 0.0,
6970 ..YamlWorkflowTelemetryConfig::default()
6971 },
6972 trace: YamlWorkflowTraceOptions {
6973 context: Some(YamlWorkflowTraceContextInput {
6974 trace_id: Some("trace-sample-zero".to_string()),
6975 ..YamlWorkflowTraceContextInput::default()
6976 }),
6977 tenant: YamlWorkflowTraceTenantContext::default(),
6978 },
6979 model: None,
6980 };
6981
6982 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
6983 &workflow,
6984 &json!({"email_text":"hello"}),
6985 &MockExecutor,
6986 None,
6987 None,
6988 &options,
6989 )
6990 .await
6991 .expect("workflow should execute");
6992
6993 assert_eq!(output.trace_id.as_deref(), Some("trace-sample-zero"));
6994 assert_eq!(
6995 output
6996 .metadata
6997 .as_ref()
6998 .and_then(|value| value.get("telemetry"))
6999 .and_then(|telemetry| telemetry.get("sampled"))
7000 .and_then(Value::as_bool),
7001 Some(false)
7002 );
7003 }
7004
7005 #[test]
7006 fn trace_id_from_traceparent_parses_w3c_header() {
7007 let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
7008 assert_eq!(
7009 trace_id_from_traceparent(traceparent).as_deref(),
7010 Some("4bf92f3577b34da6a3ce929d0e0e4736")
7011 );
7012 }
7013
7014 #[test]
7015 fn resolve_telemetry_context_marks_traceparent_source() {
7016 let mut options = YamlWorkflowRunOptions::default();
7017 options.trace.context = Some(YamlWorkflowTraceContextInput {
7018 traceparent: Some(
7019 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
7020 ),
7021 ..YamlWorkflowTraceContextInput::default()
7022 });
7023
7024 let context = resolve_telemetry_context(&options, None);
7025
7026 assert_eq!(context.trace_id_source, TraceIdSource::Traceparent);
7027 assert_eq!(
7028 context.trace_id.as_deref(),
7029 Some("4bf92f3577b34da6a3ce929d0e0e4736")
7030 );
7031 }
7032
7033 #[tokio::test]
7034 async fn workflow_run_options_derives_trace_id_from_traceparent_when_trace_id_missing() {
7035 let yaml = r#"
7036id: traceparent-derived
7037entry_node: classify
7038nodes:
7039 - id: classify
7040 node_type:
7041 llm_call:
7042 model: gpt-4.1
7043 config:
7044 prompt: |
7045 Classify this email into exactly one category:
7046 {{ input.email_text }}
7047"#;
7048
7049 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7050 let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
7051 let options = YamlWorkflowRunOptions {
7052 telemetry: YamlWorkflowTelemetryConfig {
7053 sample_rate: 0.0,
7054 ..YamlWorkflowTelemetryConfig::default()
7055 },
7056 trace: YamlWorkflowTraceOptions {
7057 context: Some(YamlWorkflowTraceContextInput {
7058 traceparent: Some(traceparent.to_string()),
7059 ..YamlWorkflowTraceContextInput::default()
7060 }),
7061 tenant: YamlWorkflowTraceTenantContext::default(),
7062 },
7063 model: None,
7064 };
7065
7066 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7067 &workflow,
7068 &json!({"email_text":"hello"}),
7069 &MockExecutor,
7070 None,
7071 None,
7072 &options,
7073 )
7074 .await
7075 .expect("workflow should execute");
7076
7077 assert_eq!(
7078 output.trace_id.as_deref(),
7079 Some("4bf92f3577b34da6a3ce929d0e0e4736")
7080 );
7081 assert_eq!(
7082 output
7083 .metadata
7084 .as_ref()
7085 .and_then(|value| value.get("telemetry"))
7086 .and_then(|telemetry| telemetry.get("sampled"))
7087 .and_then(Value::as_bool),
7088 Some(false)
7089 );
7090 }
7091
7092 #[tokio::test]
7093 async fn workflow_run_options_sample_rate_one_marks_trace_sampled() {
7094 let yaml = r#"
7095id: sample-rate-one
7096entry_node: classify
7097nodes:
7098 - id: classify
7099 node_type:
7100 llm_call:
7101 model: gpt-4.1
7102 config:
7103 prompt: |
7104 Classify this email into exactly one category:
7105 {{ input.email_text }}
7106"#;
7107
7108 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7109 let options = YamlWorkflowRunOptions {
7110 telemetry: YamlWorkflowTelemetryConfig {
7111 sample_rate: 1.0,
7112 ..YamlWorkflowTelemetryConfig::default()
7113 },
7114 trace: YamlWorkflowTraceOptions {
7115 context: Some(YamlWorkflowTraceContextInput {
7116 trace_id: Some("trace-sample-one".to_string()),
7117 ..YamlWorkflowTraceContextInput::default()
7118 }),
7119 tenant: YamlWorkflowTraceTenantContext::default(),
7120 },
7121 model: None,
7122 };
7123
7124 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7125 &workflow,
7126 &json!({"email_text":"hello"}),
7127 &MockExecutor,
7128 None,
7129 None,
7130 &options,
7131 )
7132 .await
7133 .expect("workflow should execute");
7134
7135 assert_eq!(output.trace_id.as_deref(), Some("trace-sample-one"));
7136 assert_eq!(
7137 output
7138 .metadata
7139 .as_ref()
7140 .and_then(|value| value.get("telemetry"))
7141 .and_then(|telemetry| telemetry.get("sampled"))
7142 .and_then(Value::as_bool),
7143 Some(true)
7144 );
7145 }
7146
7147 #[tokio::test]
7148 async fn workflow_run_options_sampling_is_deterministic_for_fixed_trace_id() {
7149 let yaml = r#"
7150id: sample-rate-deterministic
7151entry_node: classify
7152nodes:
7153 - id: classify
7154 node_type:
7155 llm_call:
7156 model: gpt-4.1
7157 config:
7158 prompt: |
7159 Classify this email into exactly one category:
7160 {{ input.email_text }}
7161"#;
7162
7163 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7164 let options = YamlWorkflowRunOptions {
7165 telemetry: YamlWorkflowTelemetryConfig {
7166 sample_rate: 0.5,
7167 ..YamlWorkflowTelemetryConfig::default()
7168 },
7169 trace: YamlWorkflowTraceOptions {
7170 context: Some(YamlWorkflowTraceContextInput {
7171 trace_id: Some("trace-sample-deterministic".to_string()),
7172 ..YamlWorkflowTraceContextInput::default()
7173 }),
7174 tenant: YamlWorkflowTraceTenantContext::default(),
7175 },
7176 model: None,
7177 };
7178
7179 let mut sampled_values = Vec::new();
7180 for _ in 0..3 {
7181 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7182 &workflow,
7183 &json!({"email_text":"hello"}),
7184 &MockExecutor,
7185 None,
7186 None,
7187 &options,
7188 )
7189 .await
7190 .expect("workflow should execute");
7191
7192 let sampled = output
7193 .metadata
7194 .as_ref()
7195 .and_then(|value| value.get("telemetry"))
7196 .and_then(|telemetry| telemetry.get("sampled"))
7197 .and_then(Value::as_bool)
7198 .expect("sampled flag should be present");
7199 sampled_values.push(sampled);
7200 }
7201
7202 assert!(sampled_values
7203 .iter()
7204 .all(|value| *value == sampled_values[0]));
7205 }
7206
7207 #[tokio::test]
7208 async fn workflow_run_options_reject_invalid_sample_rate() {
7209 let yaml = r#"
7210id: sample-rate-invalid
7211entry_node: classify
7212nodes:
7213 - id: classify
7214 node_type:
7215 llm_call:
7216 model: gpt-4.1
7217 config:
7218 prompt: |
7219 Classify this email into exactly one category:
7220 {{ input.email_text }}
7221"#;
7222
7223 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7224 let options = YamlWorkflowRunOptions {
7225 telemetry: YamlWorkflowTelemetryConfig {
7226 sample_rate: 1.1,
7227 ..YamlWorkflowTelemetryConfig::default()
7228 },
7229 ..YamlWorkflowRunOptions::default()
7230 };
7231
7232 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
7233 &workflow,
7234 &json!({"email_text":"hello"}),
7235 &MockExecutor,
7236 None,
7237 None,
7238 &options,
7239 )
7240 .await
7241 .expect_err("invalid sample_rate should fail");
7242
7243 match err {
7244 YamlWorkflowRunError::InvalidInput { message } => {
7245 assert!(message.contains("telemetry.sample_rate"));
7246 }
7247 _ => panic!("expected invalid input error for sample_rate"),
7248 }
7249 }
7250
7251 #[tokio::test]
7252 async fn workflow_run_options_reject_nan_sample_rate() {
7253 let yaml = r#"
7254id: sample-rate-invalid
7255entry_node: classify
7256nodes:
7257 - id: classify
7258 node_type:
7259 llm_call:
7260 model: gpt-4.1
7261 config:
7262 prompt: |
7263 Classify this email into exactly one category:
7264 {{ input.email_text }}
7265"#;
7266
7267 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7268 let options = YamlWorkflowRunOptions {
7269 telemetry: YamlWorkflowTelemetryConfig {
7270 sample_rate: f32::NAN,
7271 ..YamlWorkflowTelemetryConfig::default()
7272 },
7273 ..YamlWorkflowRunOptions::default()
7274 };
7275
7276 let err = run_workflow_yaml_with_custom_worker_and_events_and_options(
7277 &workflow,
7278 &json!({"email_text":"hello"}),
7279 &MockExecutor,
7280 None,
7281 None,
7282 &options,
7283 )
7284 .await
7285 .expect_err("nan sample_rate should fail");
7286
7287 assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
7288 }
7289
7290 #[test]
7291 fn subworkflow_options_inherit_parent_telemetry_configuration() {
7292 let mut parent_options = YamlWorkflowRunOptions::default();
7293 parent_options.telemetry.sample_rate = 0.0;
7294 parent_options.telemetry.payload_mode = YamlWorkflowPayloadMode::RedactedPayload;
7295 parent_options.telemetry.tool_trace_mode = YamlToolTraceMode::Redacted;
7296 parent_options.trace.tenant.conversation_id = Some("conv-123".to_string());
7297
7298 let parent_context = TraceContext {
7299 trace_id: Some("trace-parent".to_string()),
7300 span_id: Some("span-parent".to_string()),
7301 parent_span_id: None,
7302 traceparent: Some("00-trace-parent-span-parent-01".to_string()),
7303 tracestate: None,
7304 baggage: BTreeMap::new(),
7305 };
7306
7307 let subworkflow_options =
7308 build_subworkflow_options(&parent_options, Some(&parent_context), None);
7309
7310 assert_eq!(subworkflow_options.telemetry.sample_rate, 0.0);
7311 assert_eq!(
7312 subworkflow_options.telemetry.payload_mode,
7313 YamlWorkflowPayloadMode::RedactedPayload
7314 );
7315 assert_eq!(
7316 subworkflow_options.telemetry.tool_trace_mode,
7317 YamlToolTraceMode::Redacted
7318 );
7319 assert_eq!(
7320 subworkflow_options.trace.tenant.conversation_id.as_deref(),
7321 Some("conv-123")
7322 );
7323 assert_eq!(
7324 subworkflow_options
7325 .trace
7326 .context
7327 .as_ref()
7328 .and_then(|ctx| ctx.trace_id.as_deref()),
7329 Some("trace-parent")
7330 );
7331 }
7332
7333 #[tokio::test]
7334 async fn workflow_run_options_use_explicit_trace_id_and_payload_mode() {
7335 let yaml = r#"
7336id: trace-options-test
7337entry_node: classify
7338nodes:
7339 - id: classify
7340 node_type:
7341 llm_call:
7342 model: gpt-4.1
7343 config:
7344 prompt: |
7345 Classify this email into exactly one category:
7346 {{ input.email_text }}
7347"#;
7348
7349 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7350 let options = YamlWorkflowRunOptions {
7351 telemetry: YamlWorkflowTelemetryConfig {
7352 payload_mode: YamlWorkflowPayloadMode::RedactedPayload,
7353 ..YamlWorkflowTelemetryConfig::default()
7354 },
7355 trace: YamlWorkflowTraceOptions {
7356 context: Some(YamlWorkflowTraceContextInput {
7357 trace_id: Some("trace-fixed-123".to_string()),
7358 traceparent: Some("00-trace-fixed-123-span-1-01".to_string()),
7359 ..YamlWorkflowTraceContextInput::default()
7360 }),
7361 tenant: YamlWorkflowTraceTenantContext {
7362 conversation_id: Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42".to_string()),
7363 ..YamlWorkflowTraceTenantContext::default()
7364 },
7365 },
7366 model: None,
7367 };
7368
7369 let output = run_workflow_yaml_with_custom_worker_and_events_and_options(
7370 &workflow,
7371 &json!({"email_text":"hello"}),
7372 &MockExecutor,
7373 None,
7374 None,
7375 &options,
7376 )
7377 .await
7378 .expect("workflow should execute");
7379
7380 assert_eq!(output.trace_id.as_deref(), Some("trace-fixed-123"));
7381 assert_eq!(
7382 output
7383 .metadata
7384 .as_ref()
7385 .and_then(|value| value.get("telemetry"))
7386 .and_then(|telemetry| telemetry.get("payload_mode"))
7387 .and_then(Value::as_str),
7388 Some("redacted_payload")
7389 );
7390 assert_eq!(
7391 output
7392 .metadata
7393 .as_ref()
7394 .and_then(|value| value.get("trace"))
7395 .and_then(|trace| trace.get("tenant"))
7396 .and_then(|tenant| tenant.get("conversation_id"))
7397 .and_then(Value::as_str),
7398 Some("6e6d3125-b9f1-4af2-af1f-7cca024a2c42")
7399 );
7400 }
7401
7402 #[test]
7403 fn streamed_payload_parser_extracts_last_json_object() {
7404 let raw = r#"{"state":"missing_scenario","reason":"ok"}
7405
7406extra reasoning text
7407
7408{"state":"ready","reason":"final"}"#;
7409
7410 let resolved = parse_streamed_structured_payload(raw, false)
7411 .expect("parser should extract final JSON object");
7412 assert_eq!(resolved.payload["state"], "ready");
7413 assert!(resolved.heal_confidence.is_none());
7414 }
7415
7416 #[test]
7417 fn streamed_payload_parser_handles_unbalanced_reasoning_before_json() {
7418 let raw = "reasoning text with unmatched { braces and thoughts\n{\"state\":\"ready\",\"reason\":\"final\"}";
7419
7420 let resolved = parse_streamed_structured_payload(raw, false)
7421 .expect("parser should recover final structured JSON object");
7422 assert_eq!(resolved.payload["state"], "ready");
7423 }
7424
7425 #[test]
7426 fn streamed_payload_parser_handles_markdown_with_heal() {
7427 let raw = r#"Some preface
7428```json
7429{
7430 "state": "missing_scenario",
7431 "reason": "Need more details"
7432}
7433```
7434Some trailing explanation"#;
7435
7436 let resolved = parse_streamed_structured_payload(raw, true)
7437 .expect("heal path should parse JSON block");
7438 assert_eq!(resolved.payload["state"], "missing_scenario");
7439 assert!(resolved.heal_confidence.is_some());
7440 }
7441
7442 #[test]
7443 fn streamed_payload_parser_errors_when_no_json_candidate_exists() {
7444 let raw = "No JSON in this streamed output";
7445 let error = parse_streamed_structured_payload(raw, false)
7446 .expect_err("strict stream parse should fail without JSON candidate");
7447 assert!(error.contains("no JSON object candidate found"));
7448 }
7449
7450 #[test]
7451 fn include_raw_stream_debug_events_defaults_to_false() {
7452 let _guard = stream_debug_env_lock()
7453 .lock()
7454 .expect("stream debug env lock should not be poisoned");
7455 std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
7456 assert!(!include_raw_stream_debug_events());
7457 }
7458
7459 #[test]
7460 fn include_raw_stream_debug_events_accepts_truthy_values() {
7461 let _guard = stream_debug_env_lock()
7462 .lock()
7463 .expect("stream debug env lock should not be poisoned");
7464 std::env::set_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW", "true");
7465 assert!(include_raw_stream_debug_events());
7466 std::env::remove_var("SIMPLE_AGENTS_WORKFLOW_STREAM_INCLUDE_RAW");
7467 }
7468
7469 #[test]
7470 fn structured_json_delta_filter_strips_reasoning_prefix_and_suffix() {
7471 let mut filter = StructuredJsonDeltaFilter::default();
7472 let chunks = vec![
7473 "I will think first... ",
7474 "{\"state\":\"missing_scenario\",",
7475 "\"reason\":\"Need more details\"}",
7476 " additional commentary",
7477 ];
7478
7479 let filtered = chunks
7480 .into_iter()
7481 .filter_map(|chunk| filter.split(chunk).0)
7482 .collect::<String>();
7483
7484 assert_eq!(
7485 filtered,
7486 "{\"state\":\"missing_scenario\",\"reason\":\"Need more details\"}"
7487 );
7488 }
7489
7490 #[test]
7491 fn structured_json_delta_filter_handles_braces_inside_strings() {
7492 let mut filter = StructuredJsonDeltaFilter::default();
7493 let chunks = vec![
7494 "preface ",
7495 "{\"reason\":\"brace } in text\",\"state\":\"ok\"}",
7496 " trailing",
7497 ];
7498
7499 let filtered = chunks
7500 .into_iter()
7501 .filter_map(|chunk| filter.split(chunk).0)
7502 .collect::<String>();
7503
7504 assert_eq!(
7505 filtered,
7506 "{\"reason\":\"brace } in text\",\"state\":\"ok\"}"
7507 );
7508 }
7509
7510 #[test]
7511 fn render_json_object_as_text_converts_top_level_fields() {
7512 let rendered =
7513 render_json_object_as_text(r#"{"question":"q","confidence":0.8,"nested":{"a":1}}"#);
7514 let lines: std::collections::HashSet<&str> = rendered.lines().collect();
7515
7516 assert_eq!(lines.len(), 3);
7517 assert!(lines.contains("question: q"));
7518 assert!(lines.contains("confidence: 0.8"));
7519 assert!(lines.contains("nested: {\"a\":1}"));
7520 }
7521
7522 #[test]
7523 fn stream_json_as_text_formatter_emits_once_when_complete() {
7524 let mut formatter = StreamJsonAsTextFormatter::default();
7525 formatter.push("{\"question\":\"hello\"}");
7526
7527 let first = formatter.emit_if_ready(true);
7528 let second = formatter.emit_if_ready(true);
7529
7530 assert_eq!(first, Some("question: hello".to_string()));
7531 assert_eq!(second, None);
7532 }
7533
7534 #[test]
7535 fn rewrite_yaml_condition_preserves_output_prefix_in_field_names() {
7536 let expr = "$.nodes.classify.output.output_total == 1";
7537 let rewritten = rewrite_yaml_condition_to_ir(expr);
7538 assert_eq!(rewritten, "$.node_outputs.classify.output_total == 1");
7539 }
7540
7541 #[tokio::test]
7542 async fn validates_workflow_input_before_ir_runtime_path() {
7543 let yaml = r#"
7544id: chat-workflow
7545entry_node: classify
7546nodes:
7547 - id: classify
7548 node_type:
7549 llm_call:
7550 model: gpt-4.1
7551 config:
7552 prompt: |
7553 classify
7554"#;
7555
7556 let workflow: YamlWorkflow = serde_yaml::from_str(yaml).expect("yaml should parse");
7557 let err = run_workflow_yaml(&workflow, &json!("not-an-object"), &MockExecutor)
7558 .await
7559 .expect_err("non-object input should fail before execution");
7560
7561 assert!(matches!(err, YamlWorkflowRunError::InvalidInput { .. }));
7562 }
7563
7564 #[test]
7565 fn interpolate_template_supports_dollar_prefixed_paths() {
7566 let context = json!({
7567 "input": {
7568 "email_text": "hello"
7569 }
7570 });
7571
7572 let rendered = interpolate_template("value={{ $.input.email_text }}", &context);
7573 assert_eq!(rendered, "value=hello");
7574 }
7575}